-
Notifications
You must be signed in to change notification settings - Fork 12
/
prunable.go
291 lines (240 loc) · 10.5 KB
/
prunable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
package prunable
import (
copydir "github.com/otiai10/copy"
"github.com/iotaledger/hive.go/ierrors"
"github.com/iotaledger/hive.go/kvstore"
"github.com/iotaledger/hive.go/runtime/ioutils"
"github.com/iotaledger/hive.go/runtime/options"
"github.com/iotaledger/iota-core/pkg/core/account"
"github.com/iotaledger/iota-core/pkg/model"
"github.com/iotaledger/iota-core/pkg/storage/database"
"github.com/iotaledger/iota-core/pkg/storage/prunable/epochstore"
"github.com/iotaledger/iota-core/pkg/storage/utils"
iotago "github.com/iotaledger/iota.go/v4"
)
type Prunable struct {
apiProvider iotago.APIProvider
prunableSlotStore *BucketManager
errorHandler func(error)
semiPermanentDBConfig database.Config
semiPermanentDB *database.DBInstance
decidedUpgradeSignals *epochstore.BaseStore[model.VersionAndHash]
poolRewards *epochstore.EpochKVStore
poolStats *epochstore.BaseStore[*model.PoolsStats]
committee *epochstore.CachedStore[*account.SeatedAccounts]
}
func New(dbConfig database.Config, apiProvider iotago.APIProvider, errorHandler func(error), opts ...options.Option[BucketManager]) *Prunable {
dir := utils.NewDirectory(dbConfig.Directory, true)
semiPermanentDBConfig := dbConfig.WithDirectory(dir.PathWithCreate("semipermanent"))
// openedCallback is nil because we don't need to do anything when reopening the store.
semiPermanentDB := database.NewDBInstance(semiPermanentDBConfig, nil)
rewardPruningDelayFunc := func(epochToPrune iotago.EpochIndex) iotago.EpochIndex {
return iotago.EpochIndex(apiProvider.APIForEpoch(epochToPrune).ProtocolParameters().RewardsParameters().RetentionPeriod)
}
return &Prunable{
apiProvider: apiProvider,
errorHandler: errorHandler,
prunableSlotStore: NewBucketManager(dbConfig, errorHandler, opts...),
semiPermanentDBConfig: semiPermanentDBConfig,
semiPermanentDB: semiPermanentDB,
decidedUpgradeSignals: epochstore.NewStore(
kvstore.Realm{epochPrefixDecidedUpgradeSignals},
semiPermanentDB.KVStore(),
func(_ iotago.EpochIndex) iotago.EpochIndex { return pruningDelayDecidedUpgradeSignals },
model.VersionAndHash.Bytes,
model.VersionAndHashFromBytes,
),
poolRewards: epochstore.NewEpochKVStore(
kvstore.Realm{epochPrefixPoolRewards},
semiPermanentDB.KVStore(),
rewardPruningDelayFunc,
),
poolStats: epochstore.NewStore(
kvstore.Realm{epochPrefixPoolStats},
semiPermanentDB.KVStore(),
rewardPruningDelayFunc,
(*model.PoolsStats).Bytes,
model.PoolsStatsFromBytes,
),
committee: epochstore.NewCachedStore(
epochstore.NewStore(
kvstore.Realm{epochPrefixCommittee},
semiPermanentDB.KVStore(),
rewardPruningDelayFunc,
(*account.SeatedAccounts).Bytes,
account.SeatedAccountsFromBytes,
),
5,
),
}
}
func Clone(source *Prunable, dbConfig database.Config, apiProvider iotago.APIProvider, errorHandler func(error), opts ...options.Option[BucketManager]) (*Prunable, error) {
// Lock semi-permanent DB and prunable slot store so that nobody can try to use or open them while cloning.
source.semiPermanentDB.LockAccess()
defer source.semiPermanentDB.UnlockAccess()
source.prunableSlotStore.Lock()
defer source.prunableSlotStore.Unlock()
// Close forked prunable storage before copying its contents. All necessary locks are already acquired.
source.semiPermanentDB.CloseWithoutLocking()
source.prunableSlotStore.CloseWithoutLocking()
// Copy the storage on disk to new location.
if err := copydir.Copy(source.prunableSlotStore.dbConfig.Directory, dbConfig.Directory); err != nil {
return nil, ierrors.Wrap(err, "failed to copy prunable storage directory to new storage path")
}
return New(dbConfig, apiProvider, errorHandler, opts...), nil
}
func (p *Prunable) RestoreFromDisk() (lastPrunedEpoch iotago.EpochIndex) {
lastPrunedEpoch = p.prunableSlotStore.RestoreFromDisk()
if err := p.decidedUpgradeSignals.RestoreLastPrunedEpoch(); err != nil {
p.errorHandler(err)
}
if err := p.poolRewards.RestoreLastPrunedEpoch(); err != nil {
p.errorHandler(err)
}
if err := p.poolStats.RestoreLastPrunedEpoch(); err != nil {
p.errorHandler(err)
}
if err := p.committee.RestoreLastPrunedEpoch(); err != nil {
p.errorHandler(err)
}
return
}
func (p *Prunable) Prune(epoch iotago.EpochIndex, defaultPruningDelay iotago.EpochIndex) error {
// prune prunable_slot
if err := p.prunableSlotStore.Prune(epoch); err != nil {
return ierrors.Wrapf(err, "prune prunableSlotStore failed for epoch %d", epoch)
}
// prune prunable_epoch: each component has its own pruning delay.
if _, err := p.decidedUpgradeSignals.Prune(epoch, defaultPruningDelay); err != nil {
return ierrors.Wrapf(err, "prune decidedUpgradeSignals failed for epoch %d", epoch)
}
if err := p.poolRewards.Prune(epoch, defaultPruningDelay); err != nil {
return ierrors.Wrapf(err, "prune poolRewards failed for epoch %d", epoch)
}
if _, err := p.poolStats.Prune(epoch, defaultPruningDelay); err != nil {
return ierrors.Wrapf(err, "prune poolStats failed for epoch %d", epoch)
}
if _, err := p.committee.Prune(epoch, defaultPruningDelay); err != nil {
return ierrors.Wrapf(err, "prune committee failed for epoch %d", epoch)
}
return nil
}
func (p *Prunable) BucketSize(epoch iotago.EpochIndex) (int64, error) {
return p.prunableSlotStore.BucketSize(epoch)
}
func (p *Prunable) Size() int64 {
semiSize, err := ioutils.FolderSize(p.semiPermanentDBConfig.Directory)
if err != nil {
p.errorHandler(ierrors.Wrapf(err, "get semiPermanentDB failed for %s", p.semiPermanentDBConfig.Directory))
}
return p.prunableSlotStore.TotalSize() + semiSize
}
func (p *Prunable) Shutdown() {
p.prunableSlotStore.Shutdown()
p.semiPermanentDB.Close()
}
func (p *Prunable) Flush() {
if err := p.prunableSlotStore.Flush(); err != nil {
p.errorHandler(err)
}
if err := p.semiPermanentDB.KVStore().Flush(); err != nil {
p.errorHandler(err)
}
}
func (p *Prunable) Rollback(targetEpoch iotago.EpochIndex, startPruneRange iotago.SlotIndex, endPruneRange iotago.SlotIndex) error {
if err := p.prunableSlotStore.PruneSlots(targetEpoch, startPruneRange, endPruneRange); err != nil {
return ierrors.Wrapf(err, "failed to prune slots in range [%d, %d] from target epoch %d", startPruneRange, endPruneRange, targetEpoch)
}
if err := p.rollbackCommitteesCandidates(targetEpoch, startPruneRange); err != nil {
return ierrors.Wrapf(err, "failed to rollback committee candidates to target epoch %d", targetEpoch)
}
lastPrunedCommitteeEpoch, err := p.rollbackCommitteeEpochs(targetEpoch+1, startPruneRange-1)
if err != nil {
return ierrors.Wrapf(err, "failed to rollback committee epochs to target epoch %d", targetEpoch)
}
var lastPrunedPoolStatsEpoch iotago.EpochIndex
var lastPrunedDecidedUpgradeSignalsEpoch iotago.EpochIndex
var lastPrunedPoolRewardsEpoch iotago.EpochIndex
// Do not rollback the epoch if the targetSlot is the end of the epoch, because that is when we calculated the rewards.
if targetSlot := startPruneRange - 1; p.apiProvider.APIForSlot(targetSlot).TimeProvider().EpochEnd(targetEpoch) != targetSlot {
lastPrunedPoolStatsEpoch, _, err = p.poolStats.RollbackEpochs(targetEpoch)
if err != nil {
return ierrors.Wrapf(err, "failed to rollback pool stats epochs to target epoch %d", targetEpoch)
}
lastPrunedDecidedUpgradeSignalsEpoch, _, err = p.decidedUpgradeSignals.RollbackEpochs(targetEpoch)
if err != nil {
return ierrors.Wrapf(err, "failed to rollback decided upgrade signals epochs to target epoch %d", targetEpoch)
}
lastPrunedPoolRewardsEpoch, err = p.poolRewards.RollbackEpochs(targetEpoch)
if err != nil {
return ierrors.Wrapf(err, "failed to rollback pool rewards epochs to target epoch %d", targetEpoch)
}
}
for epochToPrune := targetEpoch + 1; epochToPrune <= max(
lastPrunedCommitteeEpoch,
lastPrunedPoolStatsEpoch,
lastPrunedDecidedUpgradeSignalsEpoch,
lastPrunedPoolRewardsEpoch,
); epochToPrune++ {
p.prunableSlotStore.DeleteBucket(epochToPrune)
}
return nil
}
func (p *Prunable) rollbackCommitteeEpochs(epoch iotago.EpochIndex, targetSlot iotago.SlotIndex) (lastPrunedEpoch iotago.EpochIndex, err error) {
lastAccessedEpoch, err := p.committee.LastAccessedEpoch()
if err != nil {
return lastAccessedEpoch, ierrors.Wrap(err, "failed to get last accessed committee epoch")
}
for epochToPrune := epoch; epochToPrune <= lastAccessedEpoch; epochToPrune++ {
if shouldRollback, rollbackErr := p.shouldRollbackCommittee(epochToPrune, targetSlot); rollbackErr != nil {
return epochToPrune, ierrors.Wrapf(rollbackErr, "error while checking if committee for epoch %d should be rolled back", epochToPrune)
} else if shouldRollback {
if err = p.committee.DeleteEpoch(epochToPrune); err != nil {
return epochToPrune, ierrors.Wrapf(err, "error while deleting committee for epoch %d", epochToPrune)
}
}
}
return lastAccessedEpoch, nil
}
// Remove committee for the next epoch only if forking point is before point of no return and committee is reused.
// Always remove committees for epochs that are newer than targetSlotEpoch+1.
func (p *Prunable) shouldRollbackCommittee(epoch iotago.EpochIndex, targetSlot iotago.SlotIndex) (bool, error) {
timeProvider := p.apiProvider.APIForSlot(targetSlot).TimeProvider()
targetSlotEpoch := timeProvider.EpochFromSlot(targetSlot)
pointOfNoReturn := timeProvider.EpochEnd(targetSlotEpoch) - p.apiProvider.APIForSlot(targetSlot).ProtocolParameters().MaxCommittableAge()
if epoch >= targetSlotEpoch+1 {
if targetSlot < pointOfNoReturn {
committee, err := p.committee.Load(targetSlotEpoch + 1)
if err != nil {
return false, err
}
if committee == nil {
return false, nil
}
return committee.IsReused(), nil
}
return false, nil
}
return true, nil
}
func (p *Prunable) rollbackCommitteesCandidates(targetSlotEpoch iotago.EpochIndex, deletionStartSlot iotago.SlotIndex) error {
candidatesToRollback := make([]iotago.AccountID, 0)
candidates, err := p.CommitteeCandidates(targetSlotEpoch)
if err != nil {
return ierrors.Wrap(err, "failed to get candidates store")
}
if err = candidates.Iterate(kvstore.EmptyPrefix, func(accountID iotago.AccountID, candidacySlot iotago.SlotIndex) bool {
if candidacySlot >= deletionStartSlot {
candidatesToRollback = append(candidatesToRollback, accountID)
}
return true
}); err != nil {
return ierrors.Wrap(err, "failed to collect candidates to rollback")
}
for _, candidateToRollback := range candidatesToRollback {
if err = candidates.Delete(candidateToRollback); err != nil {
return ierrors.Wrapf(err, "failed to rollback candidate %s", candidateToRollback)
}
}
return nil
}