Skip to content

Commit

Permalink
object: Stop supporting __NEOFS__NETMAP* X-headers
Browse files Browse the repository at this point in the history
They are not supported since
nspcc-dev/neofs-api#279. Now storage nodes
process the current epoch only.

Refs #1194.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Feb 28, 2024
1 parent 34eeda6 commit a4de38b
Show file tree
Hide file tree
Showing 11 changed files with 33 additions and 419 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Changelog for NeoFS Node

### Removed
- Object notifications incl. NATS (#2750)
- Supporting of `__NEOFS__NETMAP*` X-headers (#2751)

### Updated

Expand All @@ -22,6 +23,9 @@ supported. All NATS servers running for this purpose only are no longer needed.
If your app depends on notifications transmitted to NATS, do not update and
create an issue please.

Stop attaching `__NEOFS__NETMAP*` X-headers to NeoFS API requests. If your app
is somehow tied to them, do not update and create an issue please.

## [0.40.1] - 2024-02-22

### Fixed
Expand Down
35 changes: 10 additions & 25 deletions pkg/services/object/get/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,41 +13,26 @@ func (exec *execCtx) executeOnContainer() {
return
}

lookupDepth := exec.netmapLookupDepth()

exec.log.Debug("trying to execute in container...",
zap.Uint64("netmap lookup depth", lookupDepth),
)
exec.log.Debug("trying to execute in container...")

// initialize epoch number
ok := exec.initEpoch()
if !ok {
epoch, err := exec.svc.currentEpochReceiver.currentEpoch()
if err != nil {
exec.status = statusUndefined
exec.err = err
exec.log.Debug("could not get current epoch number", zap.Error(err))

Check warning on line 23 in pkg/services/object/get/container.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/get/container.go#L21-L23

Added lines #L21 - L23 were not covered by tests
return
}

for {
if exec.processCurrentEpoch() {
break
}

// check the maximum depth has been reached
if lookupDepth == 0 {
break
}

lookupDepth--

// go to the previous epoch
exec.curProcEpoch--
}
exec.processEpoch(epoch)
}

func (exec *execCtx) processCurrentEpoch() bool {
func (exec *execCtx) processEpoch(epoch uint64) bool {
exec.log.Debug("process epoch",
zap.Uint64("number", exec.curProcEpoch),
zap.Uint64("number", epoch),
)

traverser, ok := exec.generateTraverser(exec.address())
traverser, ok := exec.generateTraverser(exec.address(), epoch)
if !ok {
return true
}
Expand Down
38 changes: 2 additions & 36 deletions pkg/services/object/get/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ type execCtx struct {
curOff uint64

head bool

curProcEpoch uint64
}

type execOption func(*execCtx)
Expand Down Expand Up @@ -153,42 +151,10 @@ func (exec *execCtx) headOnly() bool {
return exec.head
}

func (exec *execCtx) netmapEpoch() uint64 {
return exec.prm.common.NetmapEpoch()
}

func (exec *execCtx) netmapLookupDepth() uint64 {
return exec.prm.common.NetmapLookupDepth()
}

func (exec *execCtx) initEpoch() bool {
exec.curProcEpoch = exec.netmapEpoch()
if exec.curProcEpoch > 0 {
return true
}

e, err := exec.svc.currentEpochReceiver.currentEpoch()

switch {
default:
exec.status = statusUndefined
exec.err = err

exec.log.Debug("could not get current epoch number",
zap.String("error", err.Error()),
)

return false
case err == nil:
exec.curProcEpoch = e
return true
}
}

func (exec *execCtx) generateTraverser(addr oid.Address) (*placement.Traverser, bool) {
func (exec *execCtx) generateTraverser(addr oid.Address, epoch uint64) (*placement.Traverser, bool) {
obj := addr.Object()

t, err := exec.svc.traverserGenerator.GenerateTraverser(addr.Container(), &obj, exec.curProcEpoch)
t, err := exec.svc.traverserGenerator.GenerateTraverser(addr.Container(), &obj, epoch)

switch {
default:
Expand Down
127 changes: 0 additions & 127 deletions pkg/services/object/get/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1117,130 +1117,3 @@ func TestGetRemoteSmall(t *testing.T) {
})
})
}

func TestGetFromPastEpoch(t *testing.T) {
ctx := context.Background()

var cnr container.Container
cnr.SetPlacementPolicy(netmaptest.PlacementPolicy())

var idCnr cid.ID
cnr.CalculateID(&idCnr)

addr := oidtest.Address()
addr.SetContainer(idCnr)

payloadSz := uint64(10)
payload := make([]byte, payloadSz)
_, _ = rand.Read(payload)

obj := generateObject(addr, nil, payload)

ns, as := testNodeMatrix(t, []int{2, 2})

c11 := newTestClient()
c11.addResult(addr, nil, errors.New("any error"))

c12 := newTestClient()
c12.addResult(addr, nil, errors.New("any error"))

c21 := newTestClient()
c21.addResult(addr, nil, errors.New("any error"))

c22 := newTestClient()
c22.addResult(addr, obj, nil)

svc := &Service{cfg: new(cfg)}
svc.log = test.NewLogger(false)
svc.localStorage = newTestStorage()
svc.assembly = true

const curEpoch = 13

svc.traverserGenerator = &testTraverserGenerator{
c: cnr,
b: map[uint64]placement.Builder{
curEpoch: &testPlacementBuilder{
vectors: map[string][][]netmap.NodeInfo{
addr.EncodeToString(): ns[:1],
},
},
curEpoch - 1: &testPlacementBuilder{
vectors: map[string][][]netmap.NodeInfo{
addr.EncodeToString(): ns[1:],
},
},
},
}

svc.clientCache = &testClientCache{
clients: map[string]*testClient{
as[0][0]: c11,
as[0][1]: c12,
as[1][0]: c21,
as[1][1]: c22,
},
}

svc.currentEpochReceiver = testEpochReceiver(curEpoch)

w := NewSimpleObjectWriter()

commonPrm := new(util.CommonPrm)

p := Prm{}
p.SetObjectWriter(w)
p.SetCommonParameters(commonPrm)
p.WithAddress(addr)

err := svc.Get(ctx, p)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))

commonPrm.SetNetmapLookupDepth(1)

err = svc.Get(ctx, p)
require.NoError(t, err)
require.Equal(t, obj, w.Object())

rp := RangePrm{}
rp.SetChunkWriter(w)
commonPrm.SetNetmapLookupDepth(0)
rp.SetCommonParameters(commonPrm)
rp.WithAddress(addr)

off, ln := payloadSz/3, payloadSz/3

r := objectSDK.NewRange()
r.SetOffset(off)
r.SetLength(ln)

rp.SetRange(r)

err = svc.GetRange(ctx, rp)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))

w = NewSimpleObjectWriter()
rp.SetChunkWriter(w)
commonPrm.SetNetmapLookupDepth(1)

err = svc.GetRange(ctx, rp)
require.NoError(t, err)
require.Equal(t, payload[off:off+ln], w.Object().Payload())

hp := HeadPrm{}
hp.SetHeaderWriter(w)
commonPrm.SetNetmapLookupDepth(0)
hp.SetCommonParameters(commonPrm)
hp.WithAddress(addr)

err = svc.Head(ctx, hp)
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))

w = NewSimpleObjectWriter()
hp.SetHeaderWriter(w)
commonPrm.SetNetmapLookupDepth(1)

err = svc.Head(ctx, hp)
require.NoError(t, err)
require.Equal(t, obj.CutPayload(), w.Object())
}
3 changes: 0 additions & 3 deletions pkg/services/object/get/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
prm.SetContext(exec.context())
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
Expand Down Expand Up @@ -134,7 +133,6 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
prm.SetContext(exec.context())
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
Expand Down Expand Up @@ -181,7 +179,6 @@ func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Objec
prm.SetContext(exec.context())
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
Expand Down
7 changes: 0 additions & 7 deletions pkg/services/object/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,6 @@ type readPrmCommon struct {
commonPrm
}

// SetNetmapEpoch sets the epoch number to be used to locate the object.
//
// By default current epoch on the server will be used.
func (x *readPrmCommon) SetNetmapEpoch(_ uint64) {
// FIXME: (neofs-node#1194) not supported by client
}

// GetObjectPrm groups parameters of GetObject operation.
type GetObjectPrm struct {
readPrmCommon
Expand Down
35 changes: 10 additions & 25 deletions pkg/services/object/search/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,29 @@ func (exec *execCtx) executeOnContainer() {
return
}

lookupDepth := exec.netmapLookupDepth()

exec.log.Debug("trying to execute in container...",
zap.Uint64("netmap lookup depth", lookupDepth),
)
exec.log.Debug("trying to execute in container...")

// initialize epoch number
ok := exec.initEpoch()
if !ok {
epoch, err := exec.svc.currentEpochReceiver.currentEpoch()
if err != nil {
exec.status = statusUndefined
exec.err = err
exec.log.Debug("could not get current epoch number", zap.Error(err))

Check warning on line 25 in pkg/services/object/search/container.go

View check run for this annotation

Codecov / codecov/patch

pkg/services/object/search/container.go#L23-L25

Added lines #L23 - L25 were not covered by tests
return
}

for {
if exec.processCurrentEpoch() {
break
}

// check the maximum depth has been reached
if lookupDepth == 0 {
break
}

lookupDepth--

// go to the previous epoch
exec.curProcEpoch--
}
exec.processEpoch(epoch)

exec.status = statusOK
exec.err = nil
}

func (exec *execCtx) processCurrentEpoch() bool {
func (exec *execCtx) processEpoch(epoch uint64) bool {
exec.log.Debug("process epoch",
zap.Uint64("number", exec.curProcEpoch),
zap.Uint64("number", epoch),
)

traverser, ok := exec.generateTraverser(exec.containerID())
traverser, ok := exec.generateTraverser(exec.containerID(), epoch)
if !ok {
return true
}
Expand Down
38 changes: 2 additions & 36 deletions pkg/services/object/search/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ type execCtx struct {
statusError

log *zap.Logger

curProcEpoch uint64
}

const (
Expand Down Expand Up @@ -66,40 +64,8 @@ func (exec *execCtx) searchFilters() object.SearchFilters {
return exec.prm.filters
}

func (exec *execCtx) netmapEpoch() uint64 {
return exec.prm.common.NetmapEpoch()
}

func (exec *execCtx) netmapLookupDepth() uint64 {
return exec.prm.common.NetmapLookupDepth()
}

func (exec *execCtx) initEpoch() bool {
exec.curProcEpoch = exec.netmapEpoch()
if exec.curProcEpoch > 0 {
return true
}

e, err := exec.svc.currentEpochReceiver.currentEpoch()

switch {
default:
exec.status = statusUndefined
exec.err = err

exec.log.Debug("could not get current epoch number",
zap.String("error", err.Error()),
)

return false
case err == nil:
exec.curProcEpoch = e
return true
}
}

func (exec *execCtx) generateTraverser(cnr cid.ID) (*placement.Traverser, bool) {
t, err := exec.svc.traverserGenerator.generateTraverser(cnr, exec.curProcEpoch)
func (exec *execCtx) generateTraverser(cnr cid.ID, epoch uint64) (*placement.Traverser, bool) {
t, err := exec.svc.traverserGenerator.generateTraverser(cnr, epoch)

switch {
default:
Expand Down
Loading

0 comments on commit a4de38b

Please sign in to comment.