-
Notifications
You must be signed in to change notification settings - Fork 387
/
supervisor.go
194 lines (156 loc) · 6.44 KB
/
supervisor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
// Copyright (C) 2023 Storj Labs, Inc.
// See LICENSE for copying information.
package lazyfilewalker
import (
"context"
"time"
"github.com/spacemonkeygo/monkit/v3"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/bloomfilter"
"storj.io/common/storj"
"storj.io/storj/storagenode/pieces/lazyfilewalker/execwrapper"
)
const (
// UsedSpaceFilewalkerCmdName is the name of the used-space-filewalker subcommand.
UsedSpaceFilewalkerCmdName = "used-space-filewalker"
// GCFilewalkerCmdName is the name of the gc-filewalker subcommand.
GCFilewalkerCmdName = "gc-filewalker"
// TrashCleanupFilewalkerCmdName is the name of the trash-cleanup-filewalker subcommand.
TrashCleanupFilewalkerCmdName = "trash-cleanup-filewalker"
)
var (
errLazyFilewalker = errs.Class("lazyfilewalker")
mon = monkit.Package()
)
// Supervisor manages the lazyfilewalker subprocesses.
//
// TODO: we should keep track of the number of subprocesses we have running and
// limit it to a configurable number, and queue them, since they are run per satellite.
type Supervisor struct {
log *zap.Logger
executable string
gcArgs []string
usedSpaceArgs []string
trashCleanupArgs []string
testingGCCmd execwrapper.Command
testingUsedSpaceCmd execwrapper.Command
testingTrashCleanupCmd execwrapper.Command
}
// NewSupervisor creates a new lazy filewalker Supervisor.
func NewSupervisor(log *zap.Logger, config Config, executable string) *Supervisor {
return &Supervisor{
log: log,
gcArgs: append([]string{GCFilewalkerCmdName}, config.Args()...),
usedSpaceArgs: append([]string{UsedSpaceFilewalkerCmdName}, config.Args()...),
trashCleanupArgs: append([]string{TrashCleanupFilewalkerCmdName}, config.Args()...),
executable: executable,
}
}
// TestingSetGCCmd sets the command for the gc-filewalker subprocess.
// The cmd acts as a replacement for the subprocess.
func (fw *Supervisor) TestingSetGCCmd(cmd execwrapper.Command) {
fw.testingGCCmd = cmd
}
// TestingSetUsedSpaceCmd sets the command for the used-space-filewalker subprocess.
// The cmd acts as a replacement for the subprocess.
func (fw *Supervisor) TestingSetUsedSpaceCmd(cmd execwrapper.Command) {
fw.testingUsedSpaceCmd = cmd
}
// TestingSetTrashCleanupCmd sets the command for the trash cleanup filewalker subprocess.
// The cmd acts as a replacement for the subprocess.
func (fw *Supervisor) TestingSetTrashCleanupCmd(cmd execwrapper.Command) {
fw.testingTrashCleanupCmd = cmd
}
// UsedSpaceRequest is the request struct for the used-space-filewalker process.
type UsedSpaceRequest struct {
SatelliteID storj.NodeID `json:"satelliteID"`
}
// UsedSpaceResponse is the response struct for the used-space-filewalker process.
type UsedSpaceResponse struct {
PiecesTotal int64 `json:"piecesTotal"`
PiecesContentSize int64 `json:"piecesContentSize"`
}
// GCFilewalkerRequest is the request struct for the gc-filewalker process.
type GCFilewalkerRequest struct {
SatelliteID storj.NodeID `json:"satelliteID"`
BloomFilter []byte `json:"bloomFilter"`
CreatedBefore time.Time `json:"createdBefore"`
}
// GCFilewalkerResponse is the response struct for the gc-filewalker process.
type GCFilewalkerResponse struct {
PieceIDs []storj.PieceID `json:"pieceIDs"`
PiecesSkippedCount int64 `json:"piecesSkippedCount"`
PiecesCount int64 `json:"piecesCount"`
Completed bool `json:"completed"`
}
// TrashCleanupRequest is the request struct for the trash-cleanup-filewalker process.
type TrashCleanupRequest struct {
SatelliteID storj.NodeID `json:"satelliteID"`
DateBefore time.Time `json:"dateBefore"`
}
// TrashCleanupResponse is the response struct for the trash-cleanup-filewalker process.
type TrashCleanupResponse struct {
BytesDeleted int64 `json:"bytesDeleted"`
KeysDeleted []storj.PieceID `json:"keysDeleted"`
}
// WalkAndComputeSpaceUsedBySatellite returns the total used space by satellite.
func (fw *Supervisor) WalkAndComputeSpaceUsedBySatellite(ctx context.Context, satelliteID storj.NodeID) (piecesTotal int64, piecesContentSize int64, err error) {
defer mon.Task()(&ctx)(&err)
req := UsedSpaceRequest{
SatelliteID: satelliteID,
}
var resp UsedSpaceResponse
log := fw.log.Named(UsedSpaceFilewalkerCmdName).With(zap.String("satelliteID", satelliteID.String()))
stdout := newGenericWriter(log)
err = newProcess(fw.testingUsedSpaceCmd, log, fw.executable, fw.usedSpaceArgs).run(ctx, stdout, req)
if err != nil {
return 0, 0, err
}
if err := stdout.Decode(&resp); err != nil {
return 0, 0, err
}
return resp.PiecesTotal, resp.PiecesContentSize, nil
}
// WalkSatellitePiecesToTrash walks the satellite pieces and moves the pieces that are trash to the trash using the trashFunc provided.
func (fw *Supervisor) WalkSatellitePiecesToTrash(ctx context.Context, satelliteID storj.NodeID, createdBefore time.Time, filter *bloomfilter.Filter, trashFunc func(pieceID storj.PieceID) error) (pieceIDs []storj.PieceID, piecesCount, piecesSkipped int64, err error) {
defer mon.Task()(&ctx)(&err)
if filter == nil {
return
}
req := GCFilewalkerRequest{
SatelliteID: satelliteID,
BloomFilter: filter.Bytes(),
CreatedBefore: createdBefore,
}
var resp GCFilewalkerResponse
log := fw.log.Named(GCFilewalkerCmdName).With(zap.String("satelliteID", satelliteID.String()))
stdout := newTrashHandler(log, trashFunc)
err = newProcess(fw.testingGCCmd, log, fw.executable, fw.gcArgs).run(ctx, stdout, req)
if err != nil {
return nil, 0, 0, err
}
if err := stdout.Decode(&resp); err != nil {
return nil, 0, 0, err
}
return resp.PieceIDs, resp.PiecesCount, resp.PiecesSkippedCount, nil
}
// WalkCleanupTrash deletes per-day trash directories which are older than the given time.
func (fw *Supervisor) WalkCleanupTrash(ctx context.Context, satelliteID storj.NodeID, dateBefore time.Time) (bytesDeleted int64, keysDeleted []storj.PieceID, err error) {
defer mon.Task()(&ctx)(&err)
req := TrashCleanupRequest{
SatelliteID: satelliteID,
DateBefore: dateBefore,
}
var resp TrashCleanupResponse
log := fw.log.Named(TrashCleanupFilewalkerCmdName).With(zap.String("satelliteID", satelliteID.String()))
stdout := newGenericWriter(log)
err = newProcess(fw.testingTrashCleanupCmd, log, fw.executable, fw.trashCleanupArgs).run(ctx, stdout, req)
if err != nil {
return 0, nil, err
}
if err := stdout.Decode(&resp); err != nil {
return 0, nil, err
}
return resp.BytesDeleted, resp.KeysDeleted, nil
}