-
Notifications
You must be signed in to change notification settings - Fork 402
/
filewalker.go
192 lines (168 loc) · 8.01 KB
/
filewalker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package pieces
import (
"context"
"os"
"runtime"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/bloomfilter"
"storj.io/common/storj"
"storj.io/storj/storagenode/blobstore"
"storj.io/storj/storagenode/blobstore/filestore"
)
var errFileWalker = errs.Class("filewalker")
// FileWalker implements methods to walk over pieces in a storage directory.
type FileWalker struct {
log *zap.Logger
blobs blobstore.Blobs
v0PieceInfo V0PieceInfoDB
}
// NewFileWalker creates a new FileWalker.
func NewFileWalker(log *zap.Logger, blobs blobstore.Blobs, db V0PieceInfoDB) *FileWalker {
return &FileWalker{
log: log,
blobs: blobs,
v0PieceInfo: db,
}
}
// WalkSatellitePieces executes walkFunc for each locally stored piece in the namespace of the
// given satellite. If walkFunc returns a non-nil error, WalkSatellitePieces will stop iterating
// and return the error immediately. The ctx parameter is intended specifically to allow canceling
// iteration early.
//
// Note that this method includes all locally stored pieces, both V0 and higher.
func (fw *FileWalker) WalkSatellitePieces(ctx context.Context, satellite storj.NodeID, fn func(StoredPieceAccess) error) (err error) {
defer mon.Task()(&ctx)(&err)
// iterate over all in V1 storage, skipping v0 pieces
err = fw.blobs.WalkNamespace(ctx, satellite.Bytes(), func(blobInfo blobstore.BlobInfo) error {
if blobInfo.StorageFormatVersion() < filestore.FormatV1 {
// skip v0 pieces, which are handled separately
return nil
}
pieceAccess, err := newStoredPieceAccess(fw.blobs, blobInfo)
if err != nil {
// this is not a real piece blob. the blob store can't distinguish between actual piece
// blobs and stray files whose names happen to decode as valid base32. skip this
// "blob".
return nil //nolint: nilerr // we ignore other files
}
return fn(pieceAccess)
})
if err == nil && fw.v0PieceInfo != nil {
// iterate over all in V0 storage
err = fw.v0PieceInfo.WalkSatelliteV0Pieces(ctx, fw.blobs, satellite, fn)
}
return errFileWalker.Wrap(err)
}
// WalkAndComputeSpaceUsedBySatellite walks over all pieces for a given satellite, adds up and returns the total space used.
func (fw *FileWalker) WalkAndComputeSpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (satPiecesTotal int64, satPiecesContentSize int64, err error) {
err = fw.WalkSatellitePieces(ctx, satelliteID, func(access StoredPieceAccess) error {
pieceTotal, pieceContentSize, err := access.Size(ctx)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
satPiecesTotal += pieceTotal
satPiecesContentSize += pieceContentSize
return nil
})
return satPiecesTotal, satPiecesContentSize, errFileWalker.Wrap(err)
}
// WalkSatellitePiecesToTrash returns a list of piece IDs that need to be trashed for the given satellite.
//
// ------------------------------------------------------------------------------------------------
//
// On the correctness of using access.ModTime() in place of the more precise access.CreationTime():
//
// ------------------------------------------------------------------------------------------------
//
// Background: for pieces not stored with storage.FormatV0, the access.CreationTime() value can
// only be retrieved by opening the piece file, and reading and unmarshaling the piece header.
// This is far slower than access.ModTime(), which gets the file modification time from the file
// system and only needs to do a stat(2) on the piece file. If we can make Retain() work with
// ModTime, we should.
//
// Possibility of mismatch: We do not force or require piece file modification times to be equal to
// or close to the CreationTime specified by the uplink, but we do expect that piece files will be
// written to the filesystem _after_ the CreationTime. We make the assumption already that storage
// nodes and satellites and uplinks have system clocks that are very roughly in sync (that is, they
// are out of sync with each other by less than an hour of real time, or whatever is configured as
// MaxTimeSkew). So if an uplink is not lying about CreationTime and it uploads a piece that
// makes it to a storagenode's disk as quickly as possible, even in the worst-synchronized-clocks
// case we can assume that `ModTime > (CreationTime - MaxTimeSkew)`. We also allow for storage
// node operators doing file system manipulations after a piece has been written. If piece files
// are copied between volumes and their attributes are not preserved, it will be possible for their
// modification times to be changed to something later in time. This still preserves the inequality
// relationship mentioned above, `ModTime > (CreationTime - MaxTimeSkew)`. We only stipulate
// that storage node operators must not artificially change blob file modification times to be in
// the past.
//
// If there is a mismatch: in most cases, a mismatch between ModTime and CreationTime has no
// effect. In certain remaining cases, the only effect is that a piece file which _should_ be
// garbage collected survives until the next round of garbage collection. The only really
// problematic case is when there is a relatively new piece file which was created _after_ this
// node's Retain bloom filter started being built on the satellite, and is recorded in this
// storage node's blob store before the Retain operation has completed. Then, it might be possible
// for that new piece to be garbage collected incorrectly, because it does not show up in the
// bloom filter and the node incorrectly thinks that it was created before the bloom filter.
// But if the uplink is not lying about CreationTime and its clock drift versus the storage node
// is less than `MaxTimeSkew`, and the ModTime on a blob file is correctly set from the
// storage node system time, then it is still true that `ModTime > (CreationTime -
// MaxTimeSkew)`.
//
// The rule that storage node operators need to be aware of is only this: do not artificially set
// mtimes on blob files to be in the past. Let the filesystem manage mtimes. If blob files need to
// be moved or copied between locations, and this updates the mtime, that is ok. A secondary effect
// of this rule is that if the storage node's system clock needs to be changed forward by a
// nontrivial amount, mtimes on existing blobs should also be adjusted (by the same interval,
// ideally, but just running "touch" on all blobs is sufficient to avoid incorrect deletion of
// data).
func (fw *FileWalker) WalkSatellitePiecesToTrash(ctx context.Context, satelliteID storj.NodeID, createdBefore time.Time, filter *bloomfilter.Filter) (pieceIDs []storj.PieceID, piecesCount, piecesSkipped int64, err error) {
defer mon.Task()(&ctx)(&err)
if filter == nil {
return
}
err = fw.WalkSatellitePieces(ctx, satelliteID, func(access StoredPieceAccess) error {
piecesCount++
// We call Gosched() when done because the GC process is expected to be long and we want to keep it at low priority,
// so other goroutines can continue serving requests.
defer runtime.Gosched()
pieceID := access.PieceID()
if filter.Contains(pieceID) {
// This piece is explicitly not trash. Move on.
return nil
}
// If the blob's mtime is at or after the createdBefore line, we can't safely delete it;
// it might not be trash. If it is, we can expect to get it next time.
//
// See the comment above the WalkSatellitePiecesToTrash() function for a discussion on the correctness
// of using ModTime in place of the more precise CreationTime.
mTime, err := access.ModTime(ctx)
if err != nil {
if os.IsNotExist(err) {
// piece was deleted while we were scanning.
return nil
}
piecesSkipped++
fw.log.Warn("failed to determine mtime of blob", zap.Error(err))
// but continue iterating.
return nil
}
if !mTime.Before(createdBefore) {
return nil
}
pieceIDs = append(pieceIDs, pieceID)
select {
case <-ctx.Done():
return ctx.Err()
default:
}
return nil
})
return pieceIDs, piecesCount, piecesSkipped, errFileWalker.Wrap(err)
}