-
Notifications
You must be signed in to change notification settings - Fork 38
/
check.go
317 lines (261 loc) · 8.68 KB
/
check.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
package policer
import (
"context"
"errors"
"github.com/nspcc-dev/neofs-node/pkg/core/container"
objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head"
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object"
"go.uber.org/zap"
)
// tracks Policer's check progress.
type nodeCache map[uint64]bool
func newNodeCache() *nodeCache {
m := make(map[uint64]bool)
return (*nodeCache)(&m)
}
func (n *nodeCache) set(node netmap.NodeInfo, val bool) {
(*n)[node.Hash()] = val
}
// submits storage node as a candidate to store the object replica in case of
// shortage.
func (n *nodeCache) submitReplicaCandidate(node netmap.NodeInfo) {
n.set(node, false)
}
// submits storage node as a current object replica holder.
func (n *nodeCache) submitReplicaHolder(node netmap.NodeInfo) {
n.set(node, true)
}
// processStatus returns current processing status of the storage node
//
// >0 if node does not currently hold the object
// 0 if node already holds the object
// <0 if node has not been processed yet
func (n *nodeCache) processStatus(node netmap.NodeInfo) int8 {
val, ok := (*n)[node.Hash()]
if !ok {
return -1
}
if val {
return 0
}
return 1
}
// SubmitSuccessfulReplication marks given storage node as a current object
// replica holder.
//
// SubmitSuccessfulReplication implements replicator.TaskResult.
func (n *nodeCache) SubmitSuccessfulReplication(node netmap.NodeInfo) {
n.submitReplicaHolder(node)
}
// checks whether at least one remote container node holds particular object
// replica (including as a result of successful replication).
func (n nodeCache) atLeastOneHolder() bool {
for _, v := range n {
if v {
return true
}
}
return false
}
func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.AddressWithType) {
addr := addrWithType.Address
idCnr := addr.Container()
idObj := addr.Object()
cnr, err := p.cnrSrc.Get(idCnr)
if err != nil {
p.log.Error("could not get container",
zap.Stringer("cid", idCnr),
zap.String("error", err.Error()),
)
if container.IsErrNotFound(err) {
var prm engine.InhumePrm
prm.MarkAsGarbage(addrWithType.Address)
prm.WithForceRemoval()
_, err := p.jobQueue.localStorage.Inhume(prm)
if err != nil {
p.log.Error("could not inhume object with missing container",
zap.Stringer("cid", idCnr),
zap.Stringer("oid", idObj),
zap.String("error", err.Error()))
}
}
return
}
policy := cnr.Value.PlacementPolicy()
nn, err := p.placementBuilder.BuildPlacement(idCnr, &idObj, policy)
if err != nil {
p.log.Error("could not build placement vector for object",
zap.Stringer("cid", idCnr),
zap.String("error", err.Error()),
)
return
}
c := &processPlacementContext{
Context: ctx,
object: addrWithType,
checkedNodes: newNodeCache(),
}
for i := range nn {
select {
case <-ctx.Done():
return
default:
}
p.processNodes(c, nn[i], policy.ReplicaNumberByIndex(i))
}
// if context is done, needLocalCopy might not be able to calculate
select {
case <-ctx.Done():
return
default:
}
if !c.needLocalCopy {
if !c.localNodeInContainer {
// Here we may encounter a special case where the node is not in the network
// map. In this scenario, it is impossible to determine whether the local node
// will enter the container in the future or not. At the same time, the rest of
// the network will perceive local peer as a 3rd party which will cause possible
// replication problems. Iin order to avoid the potential loss of a single
// replica, it is held.
if !p.network.IsLocalNodeInNetmap() {
p.log.Info("node is outside the network map, holding the replica...",
zap.Stringer("object", addr),
)
return
}
// If local node is outside the object container and at least one correct
// replica exists, then the node must not hold object replica. Otherwise, the
// node violates the container storage policy declared by its owner. On the
// other hand, in the complete absence of object replicas, the node must hold
// the replica to prevent data loss.
if !c.checkedNodes.atLeastOneHolder() {
p.log.Info("node outside the container, but nobody stores the object, holding the replica...",
zap.Stringer("object", addr),
)
return
}
p.log.Info("node outside the container, removing the replica so as not to violate the storage policy...",
zap.Stringer("object", addr),
)
} else {
p.log.Info("local replica of the object is redundant in the container, removing...",
zap.Stringer("object", addr),
)
}
p.cbRedundantCopy(addr)
}
}
type processPlacementContext struct {
context.Context
// whether the local node is in the object container
localNodeInContainer bool
// whether the local node must store a meaningful replica of the object
// according to the container's storage policy (e.g. as a primary placement node
// or when such nodes fail replica check). Can be true only along with
// localNodeInContainer.
needLocalCopy bool
// descriptor of the object for which the policy is being checked
object objectcore.AddressWithType
// caches nodes which has been already processed in previous iterations
checkedNodes *nodeCache
}
func (p *Policer) processNodes(ctx *processPlacementContext, nodes []netmap.NodeInfo, shortage uint32) {
prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(ctx.object.Address)
p.cfg.RLock()
headTimeout := p.headTimeout
p.cfg.RUnlock()
// Number of copies that are stored on maintenance nodes.
var uncheckedCopies int
handleMaintenance := func(node netmap.NodeInfo) {
// consider remote nodes under maintenance as problem OK. Such
// nodes MAY not respond with object, however, this is how we
// prevent spam with new replicas.
// However, additional copies should not be removed in this case,
// because we can remove the only copy this way.
ctx.checkedNodes.submitReplicaHolder(node)
shortage--
uncheckedCopies++
p.log.Debug("consider node under maintenance as OK",
zap.String("node", netmap.StringifyPublicKey(node)),
)
}
if ctx.object.Type == object.TypeLock || ctx.object.Type == object.TypeLink {
// all nodes of a container must store the `LOCK` and `LINK` objects
// for correct object relations handling:
// - `LINK` objects allows treating all children as root object;
// - `LOCK` and `LINK` objects are broadcast on their PUT requests;
// - `LOCK` object removal is a prohibited action in the GC.
shortage = uint32(len(nodes))
}
for i := 0; (!ctx.localNodeInContainer || shortage > 0) && i < len(nodes); i++ {
select {
case <-ctx.Done():
return
default:
}
isLocalNode := p.netmapKeys.IsLocalKey(nodes[i].PublicKey())
if !ctx.localNodeInContainer {
ctx.localNodeInContainer = isLocalNode
}
if shortage == 0 {
continue
} else if isLocalNode {
ctx.needLocalCopy = true
shortage--
} else if nodes[i].IsMaintenance() {
handleMaintenance(nodes[i])
} else {
if status := ctx.checkedNodes.processStatus(nodes[i]); status >= 0 {
if status == 0 {
// node already contains replica, no need to replicate
nodes = append(nodes[:i], nodes[i+1:]...)
i--
shortage--
}
continue
}
callCtx, cancel := context.WithTimeout(ctx, headTimeout)
_, err := p.remoteHeader.Head(callCtx, prm.WithNodeInfo(nodes[i]))
cancel()
if errors.Is(err, apistatus.ErrObjectNotFound) {
ctx.checkedNodes.submitReplicaCandidate(nodes[i])
continue
}
if errors.Is(err, apistatus.ErrNodeUnderMaintenance) {
handleMaintenance(nodes[i])
} else if err != nil {
p.log.Error("receive object header to check policy compliance",
zap.Stringer("object", ctx.object.Address),
zap.String("error", err.Error()),
)
} else {
shortage--
ctx.checkedNodes.submitReplicaHolder(nodes[i])
}
}
nodes = append(nodes[:i], nodes[i+1:]...)
i--
}
if shortage > 0 {
p.log.Debug("shortage of object copies detected",
zap.Stringer("object", ctx.object.Address),
zap.Uint32("shortage", shortage),
)
var task replicator.Task
task.SetObjectAddress(ctx.object.Address)
task.SetNodes(nodes)
task.SetCopiesNumber(shortage)
p.replicator.HandleTask(ctx, task, ctx.checkedNodes)
} else if uncheckedCopies > 0 {
// If we have more copies than needed, but some of them are from the maintenance nodes,
// save the local copy.
ctx.needLocalCopy = true
p.log.Debug("some of the copies are stored on nodes under maintenance, save local copy",
zap.Int("count", uncheckedCopies))
}
}