-
Notifications
You must be signed in to change notification settings - Fork 402
/
store.go
763 lines (672 loc) · 27.5 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package pieces
import (
"context"
"io"
"os"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/memory"
"storj.io/common/pb"
"storj.io/common/storj"
"storj.io/storj/storage"
"storj.io/storj/storage/filestore"
)
var (
// Error is the default error class.
Error = errs.Class("pieces error")
mon = monkit.Package()
)
// Info contains all the information we need to know about a Piece to manage them.
type Info struct {
SatelliteID storj.NodeID
PieceID storj.PieceID
PieceSize int64
PieceCreation time.Time
PieceExpiration time.Time
OrderLimit *pb.OrderLimit
UplinkPieceHash *pb.PieceHash
}
// ExpiredInfo is a fully namespaced piece id.
type ExpiredInfo struct {
SatelliteID storj.NodeID
PieceID storj.PieceID
// This can be removed when we no longer need to support the pieceinfo db. Its only purpose
// is to keep track of whether expired entries came from piece_expirations or pieceinfo.
InPieceInfo bool
}
// PieceExpirationDB stores information about pieces with expiration dates.
//
// architecture: Database
type PieceExpirationDB interface {
// GetExpired gets piece IDs that expire or have expired before the given time
GetExpired(ctx context.Context, expiresBefore time.Time, limit int64) ([]ExpiredInfo, error)
// SetExpiration sets an expiration time for the given piece ID on the given satellite
SetExpiration(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, expiresAt time.Time) error
// DeleteExpiration removes an expiration record for the given piece ID on the given satellite
DeleteExpiration(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (found bool, err error)
// DeleteFailed marks an expiration record as having experienced a failure in deleting the
// piece from the disk
DeleteFailed(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID, failedAt time.Time) error
// Trash marks a piece as in the trash
Trash(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) error
// RestoreTrash marks all piece as not being in trash
RestoreTrash(ctx context.Context, satelliteID storj.NodeID) error
}
// V0PieceInfoDB stores meta information about pieces stored with storage format V0 (where
// metadata goes in the "pieceinfo" table in the storagenodedb). The actual pieces are stored
// behind something providing the storage.Blobs interface.
//
// architecture: Database
type V0PieceInfoDB interface {
// Get returns Info about a piece.
Get(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (*Info, error)
// Delete deletes Info about a piece.
Delete(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) error
// DeleteFailed marks piece deletion from disk failed
DeleteFailed(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID, failedAt time.Time) error
// GetExpired gets piece IDs stored with storage format V0 that expire or have expired
// before the given time
GetExpired(ctx context.Context, expiredAt time.Time, limit int64) ([]ExpiredInfo, error)
// WalkSatelliteV0Pieces executes walkFunc for each locally stored piece, stored
// with storage format V0 in the namespace of the given satellite. If walkFunc returns a
// non-nil error, WalkSatelliteV0Pieces will stop iterating and return the error
// immediately. The ctx parameter is intended specifically to allow canceling iteration
// early.
WalkSatelliteV0Pieces(ctx context.Context, blobStore storage.Blobs, satellite storj.NodeID, walkFunc func(StoredPieceAccess) error) error
}
// V0PieceInfoDBForTest is like V0PieceInfoDB, but adds on the Add() method so
// that test environments with V0 piece data can be set up.
type V0PieceInfoDBForTest interface {
V0PieceInfoDB
// Add inserts Info to the database. This is only a valid thing to do, now,
// during tests, to replicate the environment of a storage node not yet fully
// migrated to V1 storage.
Add(context.Context, *Info) error
}
// PieceSpaceUsedDB stores the most recent totals from the space used cache
//
// architecture: Database
type PieceSpaceUsedDB interface {
// Init creates the one total and trash record if it doesn't already exist
Init(ctx context.Context) error
// GetPieceTotals returns the space used (total and contentSize) by all pieces stored
GetPieceTotals(ctx context.Context) (piecesTotal int64, piecesContentSize int64, err error)
// UpdatePieceTotals updates the record for aggregate spaced used for pieces (total and contentSize) with new values
UpdatePieceTotals(ctx context.Context, piecesTotal, piecesContentSize int64) error
// GetTotalsForAllSatellites returns how much total space used by pieces stored for each satelliteID
GetPieceTotalsForAllSatellites(ctx context.Context) (map[storj.NodeID]SatelliteUsage, error)
// UpdatePieceTotalsForAllSatellites updates each record for total spaced used with a new value for each satelliteID
UpdatePieceTotalsForAllSatellites(ctx context.Context, newTotalsBySatellites map[storj.NodeID]SatelliteUsage) error
// GetTrashTotal returns the total space used by trash
GetTrashTotal(ctx context.Context) (int64, error)
// UpdateTrashTotal updates the record for total spaced used for trash with a new value
UpdateTrashTotal(ctx context.Context, newTotal int64) error
}
// StoredPieceAccess allows inspection and manipulation of a piece during iteration with
// WalkSatellitePieces-type methods.
type StoredPieceAccess interface {
storage.BlobInfo
// PieceID gives the pieceID of the piece
PieceID() storj.PieceID
// Satellite gives the nodeID of the satellite which owns the piece
Satellite() (storj.NodeID, error)
// Size gives the size of the piece on disk, and the size of the piece
// content (not including the piece header, if applicable)
Size(ctx context.Context) (int64, int64, error)
// CreationTime returns the piece creation time as given in the original PieceHash (which is
// likely not the same as the file mtime). For non-FormatV0 pieces, this requires opening
// the file and unmarshaling the piece header. If exact precision is not required, ModTime()
// may be a better solution.
CreationTime(ctx context.Context) (time.Time, error)
// ModTime returns a less-precise piece creation time than CreationTime, but is generally
// much faster. For non-FormatV0 pieces, this gets the piece creation time from to the
// filesystem instead of the piece header.
ModTime(ctx context.Context) (time.Time, error)
}
// SatelliteUsage contains information of how much space is used by a satellite.
type SatelliteUsage struct {
Total int64 // the total space used (including headers)
ContentSize int64 // only content size used (excluding things like headers)
}
// Config is configuration for Store.
type Config struct {
WritePreallocSize memory.Size `help:"file preallocated for uploading" default:"4MiB"`
}
// DefaultConfig is the default value for the Config.
var DefaultConfig = Config{
WritePreallocSize: 4 * memory.MiB,
}
// Store implements storing pieces onto a blob storage implementation.
//
// architecture: Database
type Store struct {
log *zap.Logger
config Config
blobs storage.Blobs
v0PieceInfo V0PieceInfoDB
expirationInfo PieceExpirationDB
spaceUsedDB PieceSpaceUsedDB
}
// StoreForTest is a wrapper around Store to be used only in test scenarios. It enables writing
// pieces with older storage formats.
type StoreForTest struct {
*Store
}
// NewStore creates a new piece store.
func NewStore(log *zap.Logger, blobs storage.Blobs, v0PieceInfo V0PieceInfoDB, expirationInfo PieceExpirationDB, pieceSpaceUsedDB PieceSpaceUsedDB, config Config) *Store {
return &Store{
log: log,
config: config,
blobs: blobs,
v0PieceInfo: v0PieceInfo,
expirationInfo: expirationInfo,
spaceUsedDB: pieceSpaceUsedDB,
}
}
// Writer returns a new piece writer.
func (store *Store) Writer(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (_ *Writer, err error) {
defer mon.Task()(&ctx)(&err)
blobWriter, err := store.blobs.Create(ctx, storage.BlobRef{
Namespace: satellite.Bytes(),
Key: pieceID.Bytes(),
}, store.config.WritePreallocSize.Int64())
if err != nil {
return nil, Error.Wrap(err)
}
writer, err := NewWriter(store.log.Named("blob-writer"), blobWriter, store.blobs, satellite)
return writer, Error.Wrap(err)
}
// WriterForFormatVersion allows opening a piece writer with a specified storage format version.
// This is meant to be used externally only in test situations (thus the StoreForTest receiver
// type).
func (store StoreForTest) WriterForFormatVersion(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, formatVersion storage.FormatVersion) (_ *Writer, err error) {
defer mon.Task()(&ctx)(&err)
blobRef := storage.BlobRef{
Namespace: satellite.Bytes(),
Key: pieceID.Bytes(),
}
var blobWriter storage.BlobWriter
switch formatVersion {
case filestore.FormatV0:
fStore, ok := store.blobs.(interface {
TestCreateV0(ctx context.Context, ref storage.BlobRef) (_ storage.BlobWriter, err error)
})
if !ok {
return nil, Error.New("can't make a WriterForFormatVersion with this blob store (%T)", store.blobs)
}
blobWriter, err = fStore.TestCreateV0(ctx, blobRef)
case filestore.FormatV1:
blobWriter, err = store.blobs.Create(ctx, blobRef, store.config.WritePreallocSize.Int64())
default:
return nil, Error.New("please teach me how to make V%d pieces", formatVersion)
}
if err != nil {
return nil, Error.Wrap(err)
}
writer, err := NewWriter(store.log.Named("blob-writer"), blobWriter, store.blobs, satellite)
return writer, Error.Wrap(err)
}
// Reader returns a new piece reader.
func (store *Store) Reader(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (_ *Reader, err error) {
defer mon.Task()(&ctx)(&err)
blob, err := store.blobs.Open(ctx, storage.BlobRef{
Namespace: satellite.Bytes(),
Key: pieceID.Bytes(),
})
if err != nil {
if os.IsNotExist(err) {
return nil, err
}
return nil, Error.Wrap(err)
}
reader, err := NewReader(blob)
return reader, Error.Wrap(err)
}
// ReaderWithStorageFormat returns a new piece reader for a located piece, which avoids the
// potential need to check multiple storage formats to find the right blob.
func (store *Store) ReaderWithStorageFormat(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, formatVersion storage.FormatVersion) (_ *Reader, err error) {
defer mon.Task()(&ctx)(&err)
ref := storage.BlobRef{Namespace: satellite.Bytes(), Key: pieceID.Bytes()}
blob, err := store.blobs.OpenWithStorageFormat(ctx, ref, formatVersion)
if err != nil {
if os.IsNotExist(err) {
return nil, err
}
return nil, Error.Wrap(err)
}
reader, err := NewReader(blob)
return reader, Error.Wrap(err)
}
// Delete deletes the specified piece.
func (store *Store) Delete(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (err error) {
defer mon.Task()(&ctx)(&err)
err = store.blobs.Delete(ctx, storage.BlobRef{
Namespace: satellite.Bytes(),
Key: pieceID.Bytes(),
})
if err != nil {
return Error.Wrap(err)
}
// delete records in both the piece_expirations and pieceinfo DBs, wherever we find it.
// both of these calls should return no error if the requested record is not found.
if store.expirationInfo != nil {
_, err = store.expirationInfo.DeleteExpiration(ctx, satellite, pieceID)
}
if store.v0PieceInfo != nil {
err = errs.Combine(err, store.v0PieceInfo.Delete(ctx, satellite, pieceID))
}
return Error.Wrap(err)
}
// DeleteSatelliteBlobs deletes blobs folder of specific satellite after successful GE.
func (store *Store) DeleteSatelliteBlobs(ctx context.Context, satellite storj.NodeID) (err error) {
defer mon.Task()(&ctx)(&err)
err = store.blobs.DeleteNamespace(ctx, satellite.Bytes())
return Error.Wrap(err)
}
// Trash moves the specified piece to the blob trash. If necessary, it converts
// the v0 piece to a v1 piece. It also marks the item as "trashed" in the
// pieceExpirationDB.
func (store *Store) Trash(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (err error) {
defer mon.Task()(&ctx)(&err)
// Check if the MaxFormatVersionSupported piece exists. If not, we assume
// this is an old piece version and attempt to migrate it.
_, err = store.blobs.StatWithStorageFormat(ctx, storage.BlobRef{
Namespace: satellite.Bytes(),
Key: pieceID.Bytes(),
}, filestore.MaxFormatVersionSupported)
if err != nil && !errs.IsFunc(err, os.IsNotExist) {
return Error.Wrap(err)
}
if errs.IsFunc(err, os.IsNotExist) {
// MaxFormatVersionSupported does not exist, migrate.
err = store.MigrateV0ToV1(ctx, satellite, pieceID)
if err != nil {
return Error.Wrap(err)
}
}
err = store.expirationInfo.Trash(ctx, satellite, pieceID)
err = errs.Combine(err, store.blobs.Trash(ctx, storage.BlobRef{
Namespace: satellite.Bytes(),
Key: pieceID.Bytes(),
}))
return Error.Wrap(err)
}
// EmptyTrash deletes pieces in the trash that have been in there longer than trashExpiryInterval.
func (store *Store) EmptyTrash(ctx context.Context, satelliteID storj.NodeID, trashedBefore time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
_, deletedIDs, err := store.blobs.EmptyTrash(ctx, satelliteID[:], trashedBefore)
if err != nil {
return Error.Wrap(err)
}
for _, deletedID := range deletedIDs {
pieceID, pieceIDErr := storj.PieceIDFromBytes(deletedID)
if pieceIDErr != nil {
return Error.Wrap(pieceIDErr)
}
_, deleteErr := store.expirationInfo.DeleteExpiration(ctx, satelliteID, pieceID)
err = errs.Combine(err, deleteErr)
}
return Error.Wrap(err)
}
// RestoreTrash restores all pieces in the trash.
func (store *Store) RestoreTrash(ctx context.Context, satelliteID storj.NodeID) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = store.blobs.RestoreTrash(ctx, satelliteID.Bytes())
if err != nil {
return Error.Wrap(err)
}
return Error.Wrap(store.expirationInfo.RestoreTrash(ctx, satelliteID))
}
// MigrateV0ToV1 will migrate a piece stored with storage format v0 to storage
// format v1. If the piece is not stored as a v0 piece it will return an error.
// The follow failures are possible:
// - Fail to open or read v0 piece. In this case no artifacts remain.
// - Fail to Write or Commit v1 piece. In this case no artifacts remain.
// - Fail to Delete v0 piece. In this case v0 piece may remain, but v1 piece
// will exist and be preferred in future calls.
func (store *Store) MigrateV0ToV1(ctx context.Context, satelliteID storj.NodeID, pieceID storj.PieceID) (err error) {
defer mon.Task()(&ctx)(&err)
info, err := store.v0PieceInfo.Get(ctx, satelliteID, pieceID)
if err != nil {
return Error.Wrap(err)
}
err = func() (err error) {
r, err := store.Reader(ctx, satelliteID, pieceID)
if err != nil {
return err
}
defer func() { err = errs.Combine(err, r.Close()) }()
w, err := store.Writer(ctx, satelliteID, pieceID)
if err != nil {
return err
}
_, err = io.Copy(w, r)
if err != nil {
return errs.Combine(err, w.Cancel(ctx))
}
header := &pb.PieceHeader{
Hash: w.Hash(),
CreationTime: info.PieceCreation,
Signature: info.UplinkPieceHash.GetSignature(),
OrderLimit: *info.OrderLimit,
}
return w.Commit(ctx, header)
}()
if err != nil {
return Error.Wrap(err)
}
err = store.blobs.DeleteWithStorageFormat(ctx, storage.BlobRef{
Namespace: satelliteID.Bytes(),
Key: pieceID.Bytes(),
}, filestore.FormatV0)
if store.v0PieceInfo != nil {
err = errs.Combine(err, store.v0PieceInfo.Delete(ctx, satelliteID, pieceID))
}
return Error.Wrap(err)
}
// GetV0PieceInfoDBForTest returns this piece-store's reference to the V0 piece info DB (or nil,
// if this piece-store does not have one). This is ONLY intended for use with testing
// functionality.
func (store StoreForTest) GetV0PieceInfoDBForTest() V0PieceInfoDBForTest {
if store.v0PieceInfo == nil {
return nil
}
return store.v0PieceInfo.(V0PieceInfoDBForTest)
}
// GetHashAndLimit returns the PieceHash and OrderLimit associated with the specified piece. The
// piece must already have been opened for reading, and the associated *Reader passed in.
//
// Once we have migrated everything off of V0 storage and no longer need to support it, this can
// cleanly become a method directly on *Reader and will need only the 'pieceID' parameter.
func (store *Store) GetHashAndLimit(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, reader *Reader) (pb.PieceHash, pb.OrderLimit, error) {
if reader.StorageFormatVersion() == filestore.FormatV0 {
info, err := store.GetV0PieceInfo(ctx, satellite, pieceID)
if err != nil {
return pb.PieceHash{}, pb.OrderLimit{}, err // err is already wrapped as a storagenodedb.ErrPieceInfo
}
return *info.UplinkPieceHash, *info.OrderLimit, nil
}
header, err := reader.GetPieceHeader()
if err != nil {
return pb.PieceHash{}, pb.OrderLimit{}, Error.Wrap(err)
}
pieceHash := pb.PieceHash{
PieceId: pieceID,
Hash: header.GetHash(),
PieceSize: reader.Size(),
Timestamp: header.GetCreationTime(),
Signature: header.GetSignature(),
}
return pieceHash, header.OrderLimit, nil
}
// WalkSatellitePieces executes walkFunc for each locally stored piece in the namespace of the
// given satellite. If walkFunc returns a non-nil error, WalkSatellitePieces will stop iterating
// and return the error immediately. The ctx parameter is intended specifically to allow canceling
// iteration early.
//
// Note that this method includes all locally stored pieces, both V0 and higher.
func (store *Store) WalkSatellitePieces(ctx context.Context, satellite storj.NodeID, walkFunc func(StoredPieceAccess) error) (err error) {
defer mon.Task()(&ctx)(&err)
// first iterate over all in V1 storage, then all in V0
err = store.blobs.WalkNamespace(ctx, satellite.Bytes(), func(blobInfo storage.BlobInfo) error {
if blobInfo.StorageFormatVersion() < filestore.FormatV1 {
// we'll address this piece while iterating over the V0 pieces below.
return nil
}
pieceAccess, err := newStoredPieceAccess(store, blobInfo)
if err != nil {
// this is not a real piece blob. the blob store can't distinguish between actual piece
// blobs and stray files whose names happen to decode as valid base32. skip this
// "blob".
return nil
}
return walkFunc(pieceAccess)
})
if err == nil && store.v0PieceInfo != nil {
err = store.v0PieceInfo.WalkSatelliteV0Pieces(ctx, store.blobs, satellite, walkFunc)
}
return err
}
// GetExpired gets piece IDs that are expired and were created before the given time.
func (store *Store) GetExpired(ctx context.Context, expiredAt time.Time, limit int64) (_ []ExpiredInfo, err error) {
defer mon.Task()(&ctx)(&err)
expired, err := store.expirationInfo.GetExpired(ctx, expiredAt, limit)
if err != nil {
return nil, err
}
if int64(len(expired)) < limit && store.v0PieceInfo != nil {
v0Expired, err := store.v0PieceInfo.GetExpired(ctx, expiredAt, limit-int64(len(expired)))
if err != nil {
return nil, err
}
expired = append(expired, v0Expired...)
}
return expired, nil
}
// SetExpiration records an expiration time for the specified piece ID owned by the specified satellite.
func (store *Store) SetExpiration(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID, expiresAt time.Time) (err error) {
return store.expirationInfo.SetExpiration(ctx, satellite, pieceID, expiresAt)
}
// DeleteFailed marks piece as a failed deletion.
func (store *Store) DeleteFailed(ctx context.Context, expired ExpiredInfo, when time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
if expired.InPieceInfo {
return store.v0PieceInfo.DeleteFailed(ctx, expired.SatelliteID, expired.PieceID, when)
}
return store.expirationInfo.DeleteFailed(ctx, expired.SatelliteID, expired.PieceID, when)
}
// SpaceUsedForPieces returns *an approximation of* the disk space used by all local pieces (both
// V0 and later). This is an approximation because changes may be being applied to the filestore as
// this information is collected, and because it is possible that various errors in directory
// traversal could cause this count to be undersized.
//
// Returns:
// - piecesTotal: the total space used by pieces, including headers
// - piecesContentSize: the space used by piece content, not including headers
//
// This returns both the total size of pieces plus the contentSize of pieces.
func (store *Store) SpaceUsedForPieces(ctx context.Context) (piecesTotal int64, piecesContentSize int64, err error) {
if cache, ok := store.blobs.(*BlobsUsageCache); ok {
return cache.SpaceUsedForPieces(ctx)
}
satellites, err := store.getAllStoringSatellites(ctx)
if err != nil {
return 0, 0, err
}
for _, satellite := range satellites {
pieceTotal, pieceContentSize, err := store.SpaceUsedBySatellite(ctx, satellite)
if err != nil {
return 0, 0, err
}
piecesTotal += pieceTotal
piecesContentSize += pieceContentSize
}
return piecesTotal, piecesContentSize, nil
}
// SpaceUsedForTrash returns the total space used by the the piece store's
// trash, including all headers.
func (store *Store) SpaceUsedForTrash(ctx context.Context) (int64, error) {
// If the blobs is cached, it will return the cached value
return store.blobs.SpaceUsedForTrash(ctx)
}
// SpaceUsedForPiecesAndTrash returns the total space used by both active
// pieces and the trash directory.
func (store *Store) SpaceUsedForPiecesAndTrash(ctx context.Context) (int64, error) {
piecesTotal, _, err := store.SpaceUsedForPieces(ctx)
if err != nil {
return 0, err
}
trashTotal, err := store.SpaceUsedForTrash(ctx)
if err != nil {
return 0, err
}
return piecesTotal + trashTotal, nil
}
func (store *Store) getAllStoringSatellites(ctx context.Context) ([]storj.NodeID, error) {
namespaces, err := store.blobs.ListNamespaces(ctx)
if err != nil {
return nil, err
}
satellites := make([]storj.NodeID, len(namespaces))
for i, namespace := range namespaces {
satellites[i], err = storj.NodeIDFromBytes(namespace)
if err != nil {
return nil, err
}
}
return satellites, nil
}
// SpaceUsedBySatellite calculates *an approximation of* how much disk space is used for local
// piece storage in the given satellite's namespace. This is an approximation because changes may
// be being applied to the filestore as this information is collected, and because it is possible
// that various errors in directory traversal could cause this count to be undersized.
//
// This returns both the total size of pieces plus the contentSize of pieces.
func (store *Store) SpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (piecesTotal, piecesContentSize int64, err error) {
defer mon.Task()(&ctx)(&err)
if cache, ok := store.blobs.(*BlobsUsageCache); ok {
return cache.SpaceUsedBySatellite(ctx, satelliteID)
}
err = store.WalkSatellitePieces(ctx, satelliteID, func(access StoredPieceAccess) error {
pieceTotal, pieceContentSize, statErr := access.Size(ctx)
if statErr != nil {
store.log.Error("failed to stat", zap.Error(statErr), zap.Stringer("Piece ID", access.PieceID()), zap.Stringer("Satellite ID", satelliteID))
// keep iterating; we want a best effort total here.
return nil
}
piecesTotal += pieceTotal
piecesContentSize += pieceContentSize
return nil
})
if err != nil {
return 0, 0, err
}
return piecesTotal, piecesContentSize, nil
}
// SpaceUsedTotalAndBySatellite adds up the space used by and for all satellites for blob storage.
func (store *Store) SpaceUsedTotalAndBySatellite(ctx context.Context) (piecesTotal, piecesContentSize int64, totalBySatellite map[storj.NodeID]SatelliteUsage, err error) {
defer mon.Task()(&ctx)(&err)
satelliteIDs, err := store.getAllStoringSatellites(ctx)
if err != nil {
return 0, 0, nil, Error.New("failed to enumerate satellites: %w", err)
}
totalBySatellite = map[storj.NodeID]SatelliteUsage{}
var group errs.Group
for _, satelliteID := range satelliteIDs {
var satPiecesTotal int64
var satPiecesContentSize int64
err := store.WalkSatellitePieces(ctx, satelliteID, func(access StoredPieceAccess) error {
pieceTotal, pieceContentSize, err := access.Size(ctx)
if err != nil {
return err
}
satPiecesTotal += pieceTotal
satPiecesContentSize += pieceContentSize
return nil
})
if err != nil {
group.Add(err)
}
piecesTotal += satPiecesTotal
piecesContentSize += satPiecesContentSize
totalBySatellite[satelliteID] = SatelliteUsage{
Total: satPiecesTotal,
ContentSize: satPiecesContentSize,
}
}
return piecesTotal, piecesContentSize, totalBySatellite, group.Err()
}
// GetV0PieceInfo fetches the Info record from the V0 piece info database. Obviously,
// of no use when a piece does not have filestore.FormatV0 storage.
func (store *Store) GetV0PieceInfo(ctx context.Context, satellite storj.NodeID, pieceID storj.PieceID) (*Info, error) {
return store.v0PieceInfo.Get(ctx, satellite, pieceID)
}
// StorageStatus contains information about the disk store is using.
type StorageStatus struct {
DiskUsed int64
DiskFree int64
}
// StorageStatus returns information about the disk.
func (store *Store) StorageStatus(ctx context.Context) (_ StorageStatus, err error) {
defer mon.Task()(&ctx)(&err)
diskFree, err := store.blobs.FreeSpace()
if err != nil {
return StorageStatus{}, err
}
return StorageStatus{
DiskUsed: -1, // TODO set value
DiskFree: diskFree,
}, nil
}
type storedPieceAccess struct {
storage.BlobInfo
store *Store
pieceID storj.PieceID
}
func newStoredPieceAccess(store *Store, blobInfo storage.BlobInfo) (storedPieceAccess, error) {
pieceID, err := storj.PieceIDFromBytes(blobInfo.BlobRef().Key)
if err != nil {
return storedPieceAccess{}, err
}
return storedPieceAccess{
BlobInfo: blobInfo,
store: store,
pieceID: pieceID,
}, nil
}
// PieceID returns the piece ID of the piece.
func (access storedPieceAccess) PieceID() storj.PieceID {
return access.pieceID
}
// Satellite returns the satellite ID that owns the piece.
func (access storedPieceAccess) Satellite() (storj.NodeID, error) {
return storj.NodeIDFromBytes(access.BlobRef().Namespace)
}
// Size gives the size of the piece on disk, and the size of the content (not including the piece header, if applicable).
func (access storedPieceAccess) Size(ctx context.Context) (size, contentSize int64, err error) {
defer mon.Task()(&ctx)(&err)
stat, err := access.Stat(ctx)
if err != nil {
return 0, 0, err
}
size = stat.Size()
contentSize = size
if access.StorageFormatVersion() >= filestore.FormatV1 {
contentSize -= V1PieceHeaderReservedArea
}
return size, contentSize, nil
}
// CreationTime returns the piece creation time as given in the original PieceHash (which is likely
// not the same as the file mtime). This requires opening the file and unmarshaling the piece
// header. If exact precision is not required, ModTime() may be a better solution.
func (access storedPieceAccess) CreationTime(ctx context.Context) (cTime time.Time, err error) {
defer mon.Task()(&ctx)(&err)
satellite, err := access.Satellite()
if err != nil {
return time.Time{}, err
}
reader, err := access.store.ReaderWithStorageFormat(ctx, satellite, access.PieceID(), access.StorageFormatVersion())
if err != nil {
return time.Time{}, err
}
header, err := reader.GetPieceHeader()
if err != nil {
return time.Time{}, err
}
return header.CreationTime, nil
}
// ModTime returns a less-precise piece creation time than CreationTime, but is generally
// much faster. This gets the piece creation time from to the filesystem instead of the
// piece header.
func (access storedPieceAccess) ModTime(ctx context.Context) (mTime time.Time, err error) {
defer mon.Task()(&ctx)(&err)
stat, err := access.Stat(ctx)
if err != nil {
return time.Time{}, err
}
return stat.ModTime(), nil
}