/
purgeable.go
100 lines (87 loc) · 2.45 KB
/
purgeable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package store
import (
"context"
"encoding/binary"
"fmt"
"go.uber.org/zap"
)
var PurgeableMaxBatchSize = 500
type PurgeableKVStore struct {
KVStore
tablePrefix []byte // 0x09
height uint64 // ecah time you're writing a block, startby marking this block so all keys wehre we call "Put()"
heightSet bool
ttlInBlocks uint64
}
func NewPurgeableStore(tablePrefix []byte, store KVStore, ttlInBlocks uint64) *PurgeableKVStore {
return &PurgeableKVStore{
KVStore: store,
tablePrefix: tablePrefix,
ttlInBlocks: ttlInBlocks,
}
}
func (s *PurgeableKVStore) Put(ctx context.Context, key, value []byte) error {
if !s.heightSet {
return fmt.Errorf("ephemeral kv store height not set")
}
if err := s.KVStore.Put(ctx, key, value); err != nil {
return err
}
deletionKey := s.deletionKey(s.height, key)
if err := s.KVStore.Put(ctx, deletionKey, []byte{0x00}); err != nil {
return err
}
return nil
}
func (s *PurgeableKVStore) MarkCurrentHeight(height uint64) {
if traceEnabled {
zlog.Debug("setting purgeable store height",
zap.Uint64("height", height),
)
}
s.height = height
s.heightSet = true
}
func (s *PurgeableKVStore) PurgeKeys(ctx context.Context) error {
if s.height < s.ttlInBlocks {
return nil
}
lowBlockNum := uint64(0)
highBlockNum := uint64(s.height - s.ttlInBlocks)
zlog.Debug("purging below",
zap.Uint64("high_block_num", highBlockNum),
zap.Uint64("low_block_num", lowBlockNum),
)
startKey := s.deletionKey(lowBlockNum, []byte{})
endKey := s.deletionKey(highBlockNum, []byte{})
itr := s.Scan(ctx, startKey, endKey, Unlimited)
deletionKey := [][]byte{}
for itr.Next() {
if len(deletionKey) >= PurgeableMaxBatchSize {
err := s.KVStore.BatchDelete(ctx, deletionKey)
if err != nil {
return fmt.Errorf("unable to delete batch: %w", err)
}
deletionKey = [][]byte{}
}
deletionKey = append(deletionKey, itr.Item().Key)
deletionKey = append(deletionKey, s.originalKey(itr.Item().Key))
}
if len(deletionKey) >= 0 {
err := s.KVStore.BatchDelete(ctx, deletionKey)
if err != nil {
return fmt.Errorf("unable to delete batch: %w", err)
}
}
return nil
}
func (s *PurgeableKVStore) deletionKey(height uint64, key []byte) []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, height)
deletionKey := append(s.tablePrefix, buf...)
return append(deletionKey, key...)
}
func (s *PurgeableKVStore) originalKey(deletionKey []byte) []byte {
offset := len(s.tablePrefix) + 8
return deletionKey[offset:]
}