-
Notifications
You must be signed in to change notification settings - Fork 402
/
trashchore.go
157 lines (131 loc) · 3.82 KB
/
trashchore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package pieces
import (
"context"
"sync"
"time"
"go.uber.org/zap"
"storj.io/common/storj"
"storj.io/common/sync2"
"storj.io/storj/storagenode/trust"
)
// TrashChore is the chore that periodically empties the trash.
type TrashChore struct {
log *zap.Logger
trashExpiryInterval time.Duration
store *Store
trust *trust.Pool
Cycle *sync2.Cycle
started sync2.Fence
root context.Context
mu sync.Mutex
done bool
satellites map[storj.NodeID]*sync2.Workplace
}
const (
jobEmptyTrash = 1
jobRestoreTrash = 2
)
// NewTrashChore instantiates a new TrashChore. choreInterval is how often this
// chore runs, and trashExpiryInterval is passed into the EmptyTrash method to
// determine which trashed pieces should be deleted.
func NewTrashChore(log *zap.Logger, choreInterval, trashExpiryInterval time.Duration, trust *trust.Pool, store *Store) *TrashChore {
return &TrashChore{
log: log,
trashExpiryInterval: trashExpiryInterval,
store: store,
trust: trust,
Cycle: sync2.NewCycle(choreInterval),
satellites: map[storj.NodeID]*sync2.Workplace{},
}
}
// Run starts the cycle.
func (chore *TrashChore) Run(ctx context.Context) (err error) {
defer mon.Task()(&ctx)(&err)
chore.root = ctx
chore.started.Release()
err = chore.Cycle.Run(ctx, func(ctx context.Context) error {
chore.log.Debug("starting to empty trash")
var wg sync.WaitGroup
limiter := make(chan struct{}, 1)
for _, satellite := range chore.trust.GetSatellites(ctx) {
satellite := satellite
place := chore.ensurePlace(satellite)
wg.Add(1)
ok := place.Start(chore.root, jobEmptyTrash, nil, func(ctx context.Context) {
defer wg.Done()
// don't allow multiple trash jobs at the same time
select {
case <-ctx.Done():
return
case limiter <- struct{}{}:
}
defer func() { <-limiter }()
chore.log.Info("emptying trash started", zap.Stringer("Satellite ID", satellite))
trashedBefore := time.Now().Add(-chore.trashExpiryInterval)
err := chore.store.EmptyTrash(ctx, satellite, trashedBefore)
if err != nil {
chore.log.Error("emptying trash failed", zap.Error(err))
}
})
if !ok {
wg.Done()
}
}
wg.Wait()
return nil
})
chore.mu.Lock()
chore.done = true
chore.mu.Unlock()
for _, place := range chore.satellites {
place.Cancel()
}
for _, place := range chore.satellites {
<-place.Done()
}
return err
}
// Close closes the chore.
func (chore *TrashChore) Close() error {
chore.Cycle.Close()
return nil
}
// StartRestore starts a satellite restore, if it hasn't already started and
// the chore is not shutting down.
func (chore *TrashChore) StartRestore(ctx context.Context, satellite storj.NodeID) error {
if !chore.started.Wait(ctx) {
return ctx.Err()
}
place := chore.ensurePlace(satellite)
if place == nil {
return context.Canceled
}
place.Start(chore.root, jobRestoreTrash, func(jobID interface{}) bool {
return jobID == jobEmptyTrash
}, func(ctx context.Context) {
chore.log.Info("restore trash started", zap.Stringer("Satellite ID", satellite))
err := chore.store.RestoreTrash(ctx, satellite)
if err != nil {
chore.log.Error("restore trash failed", zap.Stringer("Satellite ID", satellite), zap.Error(err))
} else {
chore.log.Info("restore trash finished", zap.Stringer("Satellite ID", satellite))
}
})
return nil
}
// ensurePlace creates a work place for the specified satellite.
func (chore *TrashChore) ensurePlace(satellite storj.NodeID) *sync2.Workplace {
chore.mu.Lock()
defer chore.mu.Unlock()
if chore.done {
return nil
}
place, ok := chore.satellites[satellite]
if !ok {
place = sync2.NewWorkPlace()
chore.satellites[satellite] = place
}
return place
}