-
Notifications
You must be signed in to change notification settings - Fork 390
/
cleaner.go
145 lines (120 loc) · 4.01 KB
/
cleaner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package forgetsatellite
import (
"context"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/storj"
"storj.io/storj/storagenode/pieces"
"storj.io/storj/storagenode/reputation"
"storj.io/storj/storagenode/satellites"
"storj.io/storj/storagenode/trust"
)
// Cleaner is responsible for cleaning up satellite data.
type Cleaner struct {
log *zap.Logger
store *pieces.Store
trust *trust.Pool
satelliteDB satellites.DB
reputationDB reputation.DB
v0PieceInfoDB pieces.V0PieceInfoDB
usageCache *pieces.BlobsUsageCache
}
// NewCleaner creates a new Cleaner.
func NewCleaner(log *zap.Logger, store *pieces.Store, trust *trust.Pool, usageCache *pieces.BlobsUsageCache, satelliteDB satellites.DB, reputationDB reputation.DB, v0PieceInfoDB pieces.V0PieceInfoDB) *Cleaner {
return &Cleaner{
log: log,
store: store,
trust: trust,
satelliteDB: satelliteDB,
reputationDB: reputationDB,
v0PieceInfoDB: v0PieceInfoDB,
usageCache: usageCache,
}
}
// Run runs the cleaner.
func (c *Cleaner) Run(ctx context.Context, satelliteID storj.NodeID) (err error) {
defer mon.Task()(&ctx, satelliteID)(&err)
logger := c.log.With(zap.Stringer("satelliteID", satelliteID))
defer func() {
if err != nil {
logger.Error("cleanup failed", zap.Error(err))
if err := c.satelliteDB.UpdateSatelliteStatus(ctx, satelliteID, satellites.CleanupFailed); err != nil {
logger.Error("Failed to update satellite status.", zap.Error(err))
}
}
}()
satellite, err := c.satelliteDB.GetSatellite(ctx, satelliteID)
if err != nil {
logger.Error("failed to get satellite info", zap.Error(err))
return err
}
if satellite.Status != satellites.CleanupInProgress {
switch satellite.Status {
case satellites.CleanupSucceeded:
return errs.New("cleanup already completed for satellite")
case satellites.CleanupFailed:
return errs.New("cleanup already failed for satellite")
default:
return errs.New("forget-satellite not initiated for satellite")
}
}
logger.Info("cleaning up satellite data")
if err := c.store.DeleteSatelliteBlobs(ctx, satellite.SatelliteID); err != nil {
return err
}
logger.Info("cleaning up the trash")
// To be sure that we update the usage cache before deleting the trash, we first
// delete the trash by calling EmptyTrash because it updates the usage cache.
// Then we delete the trash folder for the satellite.
err = c.store.EmptyTrash(ctx, satellite.SatelliteID, time.Now())
if err != nil {
return err
}
err = c.usageCache.DeleteTrashNamespace(ctx, satellite.SatelliteID.Bytes())
if err != nil {
return err
}
logger.Info("removing satellite info from reputation DB")
err = c.reputationDB.Delete(ctx, satellite.SatelliteID)
if err != nil {
return err
}
// delete v0 pieces for the satellite, if any.
logger.Info("removing satellite v0 pieces if any")
err = c.v0PieceInfoDB.WalkSatelliteV0Pieces(ctx, c.usageCache, satellite.SatelliteID, func(access pieces.StoredPieceAccess) error {
return c.store.Delete(ctx, satelliteID, access.PieceID())
})
if err != nil {
return err
}
logger.Info("removing satellite from trust cache")
err = c.trust.DeleteSatellite(ctx, satellite.SatelliteID)
if err != nil {
logger.Error("failed to remove satellite from trust cache", zap.Error(err))
return err
}
logger.Info("satellite removed from trust cache")
err = c.satelliteDB.UpdateSatelliteStatus(ctx, satellite.SatelliteID, satellites.CleanupSucceeded)
if err != nil {
return err
}
logger.Info("cleanup completed")
return nil
}
// ListSatellites lists all satellites that are being cleaned up.
func (c *Cleaner) ListSatellites(ctx context.Context) (satelliteIDs []storj.NodeID, err error) {
defer mon.Task()(&ctx)(&err)
sats, err := c.satelliteDB.GetSatellites(ctx)
if err != nil {
return nil, err
}
for _, sat := range sats {
if sat.Status == satellites.CleanupInProgress {
satelliteIDs = append(satelliteIDs, sat.SatelliteID)
}
}
return satelliteIDs, nil
}