-
Notifications
You must be signed in to change notification settings - Fork 14
/
relations.go
232 lines (191 loc) · 6.44 KB
/
relations.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
package relations
import (
"context"
"errors"
"fmt"
"github.com/nspcc-dev/neofs-sdk-go/bearer"
"github.com/nspcc-dev/neofs-sdk-go/client"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/session"
"github.com/nspcc-dev/neofs-sdk-go/user"
)
// Tokens contains different tokens to perform requests in Relations implementations.
type Tokens struct {
Session *session.Object
Bearer *bearer.Token
}
var (
// ErrNoLeftSibling is an error that must be returned if object doesn't have left sibling in objects chain.
ErrNoLeftSibling = errors.New("no left siblings")
// ErrNoSplitInfo is an error that must be returned if requested object isn't virtual.
ErrNoSplitInfo = errors.New("no split info")
)
// HeadExecutor describes methods to get object head.
type HeadExecutor interface {
ObjectHead(ctx context.Context, containerID cid.ID, objectID oid.ID, signer user.Signer, prm client.PrmObjectHead) (*object.Object, error)
}
// SearchExecutor describes methods to search objects.
type SearchExecutor interface {
ObjectSearchInit(ctx context.Context, containerID cid.ID, signer user.Signer, prm client.PrmObjectSearch) (*client.ObjectListReader, error)
}
// Executor describes all methods required to find all siblings for object.
type Executor interface {
HeadExecutor
SearchExecutor
}
// Get returns all related phy objects for provided root object ID in split-chain order, without linking object id.
// If linking object is found its id will be returned in the second result variable.
//
// Result doesn't include root object ID itself.
func Get(ctx context.Context, executor Executor, containerID cid.ID, rootObjectID oid.ID, tokens Tokens, signer user.Signer) ([]oid.ID, *oid.ID, error) {
splitInfo, err := getSplitInfo(ctx, executor, containerID, rootObjectID, tokens, signer)
if err != nil {
if errors.Is(err, ErrNoSplitInfo) {
return []oid.ID{}, nil, nil
}
return nil, nil, err
}
// collect split chain by the descending ease of operations (ease is evaluated heuristically).
// If any approach fails, we don't try the next since we assume that it will fail too.
if _, ok := splitInfo.Link(); !ok {
// the list is expected to contain last part and (probably) split info
list, err := findSiblingByParentID(ctx, executor, containerID, rootObjectID, tokens, signer)
if err != nil {
return nil, nil, fmt.Errorf("children: %w", err)
}
for _, id := range list {
split, err := getSplitInfo(ctx, executor, containerID, id, tokens, signer)
if err != nil {
if errors.Is(err, ErrNoSplitInfo) {
continue
}
return nil, nil, fmt.Errorf("split info: %w", err)
}
if link, ok := split.Link(); ok {
splitInfo.SetLink(link)
break
}
if last, ok := split.LastPart(); ok {
splitInfo.SetLastPart(last)
}
}
}
if idLinking, ok := splitInfo.Link(); ok {
children, err := listChildrenByLinker(ctx, executor, containerID, idLinking, tokens, signer)
if err != nil {
return nil, nil, fmt.Errorf("linking object's header: %w", err)
}
return children, &idLinking, nil
}
idMember, ok := splitInfo.LastPart()
if !ok {
return nil, nil, errors.New("missing any data in received object split information")
}
chain := []oid.ID{idMember}
chainSet := map[oid.ID]struct{}{idMember: {}}
for {
idMember, err = getLeftSibling(ctx, executor, containerID, idMember, tokens, signer)
if err != nil {
if errors.Is(err, ErrNoLeftSibling) {
break
}
return nil, nil, fmt.Errorf("split chain member's header: %w", err)
}
if _, ok = chainSet[idMember]; ok {
return nil, nil, fmt.Errorf("duplicated member in the split chain %s", idMember)
}
chain = append([]oid.ID{idMember}, chain...)
chainSet[idMember] = struct{}{}
}
return chain, nil, nil
}
func getSplitInfo(ctx context.Context, header HeadExecutor, cnrID cid.ID, objID oid.ID, tokens Tokens, signer user.Signer) (*object.SplitInfo, error) {
var prmHead client.PrmObjectHead
if tokens.Bearer != nil {
prmHead.WithBearerToken(*tokens.Bearer)
}
if tokens.Session != nil {
prmHead.WithinSession(*tokens.Session)
}
prmHead.MarkRaw()
hdr, err := header.ObjectHead(ctx, cnrID, objID, signer, prmHead)
if err != nil {
var errSplit *object.SplitInfoError
if errors.As(err, &errSplit) {
return errSplit.SplitInfo(), nil
}
return nil, fmt.Errorf("raw object header: %w", err)
}
if hdr.SplitID() == nil {
return nil, ErrNoSplitInfo
}
si := object.NewSplitInfo()
si.SetSplitID(hdr.SplitID())
if hdr.HasParent() {
if len(hdr.Children()) > 0 {
si.SetLink(objID)
} else {
si.SetLastPart(objID)
}
}
return si, nil
}
func findSiblingByParentID(ctx context.Context, searcher SearchExecutor, cnrID cid.ID, objID oid.ID, tokens Tokens, signer user.Signer) ([]oid.ID, error) {
var query object.SearchFilters
var prm client.PrmObjectSearch
query.AddParentIDFilter(object.MatchStringEqual, objID)
prm.SetFilters(query)
if tokens.Bearer != nil {
prm.WithBearerToken(*tokens.Bearer)
}
if tokens.Session != nil {
prm.WithinSession(*tokens.Session)
}
resSearch, err := searcher.ObjectSearchInit(ctx, cnrID, signer, prm)
if err != nil {
return nil, fmt.Errorf("search: %w", err)
}
var res []oid.ID
err = resSearch.Iterate(func(id oid.ID) bool {
res = append(res, id)
return false
})
if err != nil {
return nil, fmt.Errorf("iterate: %w", err)
}
return res, nil
}
func listChildrenByLinker(ctx context.Context, header HeadExecutor, cnrID cid.ID, objID oid.ID, tokens Tokens, signer user.Signer) ([]oid.ID, error) {
var prm client.PrmObjectHead
if tokens.Bearer != nil {
prm.WithBearerToken(*tokens.Bearer)
}
if tokens.Session != nil {
prm.WithinSession(*tokens.Session)
}
hdr, err := header.ObjectHead(ctx, cnrID, objID, signer, prm)
if err != nil {
return nil, fmt.Errorf("linking object's header: %w", err)
}
return hdr.Children(), nil
}
func getLeftSibling(ctx context.Context, header HeadExecutor, cnrID cid.ID, objID oid.ID, tokens Tokens, signer user.Signer) (oid.ID, error) {
var prm client.PrmObjectHead
if tokens.Bearer != nil {
prm.WithBearerToken(*tokens.Bearer)
}
if tokens.Session != nil {
prm.WithinSession(*tokens.Session)
}
hdr, err := header.ObjectHead(ctx, cnrID, objID, signer, prm)
if err != nil {
return oid.ID{}, fmt.Errorf("split chain member's header: %w", err)
}
idMember, ok := hdr.PreviousID()
if !ok {
return oid.ID{}, ErrNoLeftSibling
}
return idMember, nil
}