-
Notifications
You must be signed in to change notification settings - Fork 211
/
prune.go
101 lines (92 loc) · 2.56 KB
/
prune.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package prune
import (
"context"
"time"
"go.uber.org/zap"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/activesets"
"github.com/spacemeshos/go-spacemesh/sql/certificates"
"github.com/spacemeshos/go-spacemesh/sql/proposals"
"github.com/spacemeshos/go-spacemesh/sql/transactions"
"github.com/spacemeshos/go-spacemesh/timesync"
)
type Opt func(*Pruner)
func WithLogger(logger *zap.Logger) Opt {
return func(p *Pruner) {
p.logger = logger
}
}
func New(db *sql.Database, safeDist uint32, activesetEpoch types.EpochID, opts ...Opt) *Pruner {
p := &Pruner{
logger: zap.NewNop(),
db: db,
safeDist: safeDist,
activesetEpoch: activesetEpoch,
}
for _, opt := range opts {
opt(p)
}
return p
}
type Pruner struct {
logger *zap.Logger
db *sql.Database
safeDist uint32
activesetEpoch types.EpochID
}
func Run(ctx context.Context, p *Pruner, clock *timesync.NodeClock, interval time.Duration) {
p.logger.With().Info("db pruning launched",
zap.Uint32("dist", p.safeDist),
zap.Uint32("active set epoch", p.activesetEpoch.Uint32()),
zap.Duration("interval", interval),
)
for {
select {
case <-ctx.Done():
return
case <-time.After(interval):
current := clock.CurrentLayer()
if err := p.Prune(current); err != nil {
p.logger.Error("failed to prune",
current.Field().Zap(),
zap.Uint32("dist", p.safeDist),
zap.Error(err),
)
}
}
}
}
func (p *Pruner) Prune(current types.LayerID) error {
oldest := current - types.LayerID(p.safeDist)
start := time.Now()
if err := proposals.DeleteBefore(p.db, oldest); err != nil {
return err
}
proposalLatency.Observe(time.Since(start).Seconds())
start = time.Now()
if err := certificates.DeleteCertBefore(p.db, oldest); err != nil {
return err
}
certLatency.Observe(time.Since(start).Seconds())
start = time.Now()
if err := transactions.DeleteProposalTxsBefore(p.db, oldest); err != nil {
return err
}
propTxLatency.Observe(time.Since(start).Seconds())
if current.GetEpoch() > p.activesetEpoch {
start = time.Now()
epoch := current.GetEpoch()
if epoch > 0 {
epoch--
}
// current - 1 as activesets will be fetched in hare eligibility oracle
// for example if we are in epoch 9, we want to prune 7 and below
// as activesets from 8 will be stil be needed at the beginning of epoch 8
if err := activesets.DeleteBeforeEpoch(p.db, epoch); err != nil {
return err
}
activeSetLatency.Observe(time.Since(start).Seconds())
}
return nil
}