-
Notifications
You must be signed in to change notification settings - Fork 402
/
reverifier.go
349 lines (314 loc) · 12.3 KB
/
reverifier.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
// Copyright (C) 2022 Storj Labs, Inc.
// See LICENSE for copying information.
package audit
import (
"bytes"
"context"
"io"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/errs2"
"storj.io/common/pb"
"storj.io/common/pkcrypto"
"storj.io/common/rpc"
"storj.io/common/rpc/rpcstatus"
"storj.io/common/signing"
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/storj/satellite/metabase"
"storj.io/storj/satellite/overlay"
"storj.io/uplink/private/piecestore"
)
// PieceLocator specifies all information necessary to look up a particular piece
// on a particular satellite.
type PieceLocator struct {
StreamID uuid.UUID
Position metabase.SegmentPosition
NodeID storj.NodeID
PieceNum int
}
// ReverificationJob represents a job as received from the reverification
// audit queue.
type ReverificationJob struct {
Locator PieceLocator
InsertedAt time.Time
ReverifyCount int
LastAttempt time.Time
}
// Reverifier pulls jobs from the reverification queue and fulfills them
// by performing the requested reverifications.
//
// architecture: Worker
type Reverifier struct {
*Verifier
log *zap.Logger
db ReverifyQueue
// retryInterval defines a limit on how frequently we will retry
// reverification audits. At least this long should elapse between
// attempts.
retryInterval time.Duration
}
// Outcome enumerates the possible results of a piecewise audit.
//
// Note that it is very similar to reputation.AuditType, but it is
// different in scope and needs a slightly different set of values.
type Outcome int
const (
// OutcomeNotPerformed indicates an audit was not performed, for any of a
// variety of reasons, but that it should be reattempted later.
OutcomeNotPerformed Outcome = iota
// OutcomeNotNecessary indicates that an audit is no longer required,
// for example because the segment has been updated or no longer exists.
OutcomeNotNecessary
// OutcomeSuccess indicates that an audit took place and the piece was
// fully validated.
OutcomeSuccess
// OutcomeFailure indicates that an audit took place but that the node
// failed the audit, either because it did not have the piece or the
// data was incorrect.
OutcomeFailure
// OutcomeTimedOut indicates the audit could not be completed because
// it took too long. The audit should be retried later.
OutcomeTimedOut
// OutcomeNodeOffline indicates that the audit could not be completed
// because the node could not be contacted. The audit should be
// retried later.
OutcomeNodeOffline
// OutcomeUnknownError indicates that the audit could not be completed
// because of an error not otherwise expected or recognized. The
// audit should be retried later.
OutcomeUnknownError
)
// NewReverifier creates a Reverifier.
func NewReverifier(log *zap.Logger, verifier *Verifier, db ReverifyQueue, config Config) *Reverifier {
return &Reverifier{
log: log,
Verifier: verifier,
db: db,
retryInterval: config.ReverificationRetryInterval,
}
}
// ReverifyPiece acquires a piece from a single node and verifies its
// contents, its hash, and its order limit.
func (reverifier *Reverifier) ReverifyPiece(ctx context.Context, logger *zap.Logger, locator *PieceLocator) (outcome Outcome, reputation overlay.ReputationStatus) {
defer mon.Task()(&ctx)(nil)
outcome, reputation, err := reverifier.DoReverifyPiece(ctx, logger, locator)
if err != nil {
logger.Error("could not perform reverification due to error", zap.Error(err))
return outcome, reputation
}
var (
successes int
offlines int
fails int
pending int
unknown int
)
switch outcome {
case OutcomeNotPerformed, OutcomeNotNecessary:
case OutcomeSuccess:
successes++
case OutcomeFailure:
fails++
case OutcomeTimedOut:
pending++
case OutcomeNodeOffline:
offlines++
case OutcomeUnknownError:
unknown++
}
mon.Meter("reverify_successes_global").Mark(successes) //mon:locked
mon.Meter("reverify_offlines_global").Mark(offlines) //mon:locked
mon.Meter("reverify_fails_global").Mark(fails) //mon:locked
mon.Meter("reverify_contained_global").Mark(pending) //mon:locked
mon.Meter("reverify_unknown_global").Mark(unknown) //mon:locked
return outcome, reputation
}
// DoReverifyPiece acquires a piece from a single node and verifies its
// contents, its hash, and its order limit.
func (reverifier *Reverifier) DoReverifyPiece(ctx context.Context, logger *zap.Logger, locator *PieceLocator) (outcome Outcome, reputation overlay.ReputationStatus, err error) {
defer mon.Task()(&ctx)(&err)
// First, we must ensure that the specified node still holds the indicated piece.
segment, err := reverifier.metabase.GetSegmentByPosition(ctx, metabase.GetSegmentByPosition{
StreamID: locator.StreamID,
Position: locator.Position,
})
if err != nil {
if metabase.ErrSegmentNotFound.Has(err) {
logger.Debug("segment no longer exists")
return OutcomeNotNecessary, reputation, nil
}
return OutcomeNotPerformed, reputation, Error.Wrap(err)
}
if segment.Expired(reverifier.nowFn()) {
logger.Debug("segment expired before ReverifyPiece")
return OutcomeNotNecessary, reputation, nil
}
piece, found := segment.Pieces.FindByNum(locator.PieceNum)
if !found || piece.StorageNode != locator.NodeID {
logger.Debug("piece is no longer held by the indicated node")
return OutcomeNotNecessary, reputation, nil
}
// TODO remove this when old entries with empty StreamID will be deleted
if locator.StreamID.IsZero() {
logger.Debug("ReverifyPiece: skip pending audit with empty StreamID")
return OutcomeNotNecessary, reputation, nil
}
pieceSize := segment.PieceSize()
limit, piecePrivateKey, cachedNodeInfo, err := reverifier.orders.CreateAuditPieceOrderLimit(ctx, locator.NodeID, uint16(locator.PieceNum), segment.RootPieceID, int32(pieceSize))
if err != nil {
if overlay.ErrNodeDisqualified.Has(err) {
logger.Debug("ReverifyPiece: order limit not created (node is already disqualified)")
return OutcomeNotNecessary, reputation, nil
}
if overlay.ErrNodeFinishedGE.Has(err) {
logger.Debug("ReverifyPiece: order limit not created (node has completed graceful exit)")
return OutcomeNotNecessary, reputation, nil
}
if overlay.ErrNodeOffline.Has(err) {
logger.Debug("ReverifyPiece: order limit not created (node considered offline)")
return OutcomeNodeOffline, reputation, nil
}
return OutcomeNotPerformed, reputation, Error.Wrap(err)
}
reputation = cachedNodeInfo.Reputation
pieceData, pieceHash, pieceOriginalLimit, err := reverifier.GetPiece(ctx, limit, piecePrivateKey, cachedNodeInfo.LastIPPort, int32(pieceSize))
if err != nil {
if rpc.Error.Has(err) {
if errs.Is(err, context.DeadlineExceeded) {
// dial timeout
return OutcomeTimedOut, reputation, nil
}
if errs2.IsRPC(err, rpcstatus.Unknown) {
// dial failed -- offline node
return OutcomeNodeOffline, reputation, nil
}
// unknown transport error
logger.Info("ReverifyPiece: unknown transport error", zap.Error(err))
return OutcomeUnknownError, reputation, nil
}
if errs2.IsRPC(err, rpcstatus.NotFound) {
// Fetch the segment metadata again and see if it has been altered in the interim
err := reverifier.checkIfSegmentAltered(ctx, segment)
if err != nil {
// if so, we skip this audit
logger.Debug("ReverifyPiece: audit source segment changed during reverification", zap.Error(err))
return OutcomeNotNecessary, reputation, nil
}
// missing share
logger.Info("ReverifyPiece: audit failure; node indicates piece not found")
return OutcomeFailure, reputation, nil
}
if errs2.IsRPC(err, rpcstatus.DeadlineExceeded) {
// dial successful, but download timed out
return OutcomeTimedOut, reputation, nil
}
// unknown error
logger.Info("ReverifyPiece: unknown error from node", zap.Error(err))
return OutcomeUnknownError, reputation, nil
}
// We have successfully acquired the piece from the node. Now, we must verify its contents.
if pieceHash == nil {
logger.Info("ReverifyPiece: audit failure; node did not send piece hash as requested")
return OutcomeFailure, reputation, nil
}
if pieceOriginalLimit == nil {
logger.Info("ReverifyPiece: audit failure; node did not send original order limit as requested")
return OutcomeFailure, reputation, nil
}
// check for the correct size
if int64(len(pieceData)) != pieceSize {
logger.Info("ReverifyPiece: audit failure; downloaded piece has incorrect size", zap.Int64("expected-size", pieceSize), zap.Int("received-size", len(pieceData)))
outcome = OutcomeFailure
// continue to run, so we can check if the piece was legitimately changed before
// blaming the node
} else {
// check for a matching hash
downloadedHash := pkcrypto.SHA256Hash(pieceData)
if !bytes.Equal(downloadedHash, pieceHash.Hash) {
logger.Info("ReverifyPiece: audit failure; downloaded piece does not match hash", zap.ByteString("downloaded", downloadedHash), zap.ByteString("expected", pieceHash.Hash))
outcome = OutcomeFailure
// continue to run, so we can check if the piece was legitimately changed
// before blaming the node
} else {
// check that the order limit and hash sent by the storagenode were
// correctly signed (order limit signed by this satellite, hash signed
// by the uplink public key in the order limit)
signer := signing.SigneeFromPeerIdentity(reverifier.auditor)
if err := signing.VerifyOrderLimitSignature(ctx, signer, pieceOriginalLimit); err != nil {
return OutcomeFailure, reputation, nil
}
if err := signing.VerifyUplinkPieceHashSignature(ctx, pieceOriginalLimit.UplinkPublicKey, pieceHash); err != nil {
return OutcomeFailure, reputation, nil
}
}
}
if err := reverifier.checkIfSegmentAltered(ctx, segment); err != nil {
logger.Debug("ReverifyPiece: audit source segment changed during reverification", zap.Error(err))
return OutcomeNotNecessary, reputation, nil
}
if outcome == OutcomeFailure {
return OutcomeFailure, reputation, nil
}
return OutcomeSuccess, reputation, nil
}
// GetPiece uses the piecestore client to download a piece (and the associated
// original OrderLimit and PieceHash) from a node.
func (reverifier *Reverifier) GetPiece(ctx context.Context, limit *pb.AddressedOrderLimit, piecePrivateKey storj.PiecePrivateKey, cachedIPAndPort string, pieceSize int32) (pieceData []byte, hash *pb.PieceHash, origLimit *pb.OrderLimit, err error) {
defer mon.Task()(&ctx)(&err)
// determines number of seconds allotted for receiving data from a storage node
timedCtx := ctx
if reverifier.minBytesPerSecond > 0 {
maxTransferTime := time.Duration(int64(time.Second) * int64(pieceSize) / reverifier.minBytesPerSecond.Int64())
if maxTransferTime < reverifier.minDownloadTimeout {
maxTransferTime = reverifier.minDownloadTimeout
}
var cancel func()
timedCtx, cancel = context.WithTimeout(ctx, maxTransferTime)
defer cancel()
}
targetNodeID := limit.GetLimit().StorageNodeId
log := reverifier.log.With(zap.Stringer("node-id", targetNodeID), zap.Stringer("piece-id", limit.GetLimit().PieceId))
var ps *piecestore.Client
// if cached IP is given, try connecting there first
if cachedIPAndPort != "" {
nodeAddr := storj.NodeURL{
ID: targetNodeID,
Address: cachedIPAndPort,
}
ps, err = piecestore.Dial(timedCtx, reverifier.dialer, nodeAddr, piecestore.DefaultConfig)
if err != nil {
log.Debug("failed to connect to audit target node at cached IP", zap.String("cached-ip-and-port", cachedIPAndPort), zap.Error(err))
}
}
// if no cached IP was given, or connecting to cached IP failed, use node address
if ps == nil {
nodeAddr := storj.NodeURL{
ID: targetNodeID,
Address: limit.GetStorageNodeAddress().Address,
}
ps, err = piecestore.Dial(timedCtx, reverifier.dialer, nodeAddr, piecestore.DefaultConfig)
if err != nil {
return nil, nil, nil, Error.Wrap(err)
}
}
defer func() {
err := ps.Close()
if err != nil {
log.Error("audit reverifier failed to close conn to node", zap.Error(err))
}
}()
downloader, err := ps.Download(timedCtx, limit.GetLimit(), piecePrivateKey, 0, int64(pieceSize))
if err != nil {
return nil, nil, nil, Error.Wrap(err)
}
defer func() { err = errs.Combine(err, Error.Wrap(downloader.Close())) }()
buf := make([]byte, pieceSize)
_, err = io.ReadFull(downloader, buf)
if err != nil {
return nil, nil, nil, Error.Wrap(err)
}
hash, originLimit := downloader.GetHashAndLimit()
return buf, hash, originLimit, nil
}