/
records_book.go
207 lines (189 loc) · 5.16 KB
/
records_book.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
package store
import (
"context"
"encoding"
"errors"
"fmt"
"sync"
"time"
"github.com/sliceledger-blockchain/slice-ledger/op-service/clock"
"github.com/ethereum/go-ethereum/log"
lru "github.com/hashicorp/golang-lru/v2"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
)
const (
maxPruneBatchSize = 20
)
type record interface {
SetLastUpdated(time.Time)
LastUpdated() time.Time
encoding.BinaryMarshaler
encoding.BinaryUnmarshaler
}
type recordDiff[V record] interface {
Apply(v V)
}
var UnknownRecordErr = errors.New("unknown record")
// recordsBook is a generic K-V store to embed in the extended-peerstore.
// It prunes old entries to keep the store small.
// The recordsBook can be wrapped to customize typing more.
type recordsBook[K ~string, V record] struct {
ctx context.Context
cancelFn context.CancelFunc
clock clock.Clock
log log.Logger
bgTasks sync.WaitGroup
store ds.Batching
cache *lru.Cache[K, V]
newRecord func() V
dsBaseKey ds.Key
dsEntryKey func(K) ds.Key
recordExpiry time.Duration // pruning is disabled if this is 0
sync.RWMutex
}
func newRecordsBook[K ~string, V record](ctx context.Context, logger log.Logger, clock clock.Clock, store ds.Batching, cacheSize int, recordExpiry time.Duration,
dsBaseKey ds.Key, newRecord func() V, dsEntryKey func(K) ds.Key) (*recordsBook[K, V], error) {
cache, err := lru.New[K, V](cacheSize)
if err != nil {
return nil, fmt.Errorf("failed to create records cache: %w", err)
}
ctx, cancelFn := context.WithCancel(ctx)
book := &recordsBook[K, V]{
ctx: ctx,
cancelFn: cancelFn,
clock: clock,
log: logger,
store: store,
cache: cache,
newRecord: newRecord,
dsBaseKey: dsBaseKey,
dsEntryKey: dsEntryKey,
recordExpiry: recordExpiry,
}
return book, nil
}
func (d *recordsBook[K, V]) startGC() {
if d.recordExpiry == 0 {
return
}
startGc(d.ctx, d.log, d.clock, &d.bgTasks, d.prune)
}
func (d *recordsBook[K, V]) GetRecord(key K) (V, error) {
d.RLock()
defer d.RUnlock()
rec, err := d.getRecord(key)
return rec, err
}
func (d *recordsBook[K, V]) dsKey(key K) ds.Key {
return d.dsBaseKey.Child(d.dsEntryKey(key))
}
func (d *recordsBook[K, V]) deleteRecord(key K) error {
d.cache.Remove(key)
err := d.store.Delete(d.ctx, d.dsKey(key))
if errors.Is(err, ds.ErrNotFound) {
return nil
}
return fmt.Errorf("failed to delete entry with key %v: %w", key, err)
}
func (d *recordsBook[K, V]) getRecord(key K) (v V, err error) {
if val, ok := d.cache.Get(key); ok {
if d.hasExpired(val) {
return v, UnknownRecordErr
}
return val, nil
}
data, err := d.store.Get(d.ctx, d.dsKey(key))
if errors.Is(err, ds.ErrNotFound) {
return v, UnknownRecordErr
} else if err != nil {
return v, fmt.Errorf("failed to load value of key %v: %w", key, err)
}
v = d.newRecord()
if err := v.UnmarshalBinary(data); err != nil {
return v, fmt.Errorf("invalid value for key %v: %w", key, err)
}
if d.hasExpired(v) {
return v, UnknownRecordErr
}
d.cache.Add(key, v)
return v, nil
}
func (d *recordsBook[K, V]) SetRecord(key K, diff recordDiff[V]) (V, error) {
d.Lock()
defer d.Unlock()
rec, err := d.getRecord(key)
if err == UnknownRecordErr { // instantiate new record if it does not exist yet
rec = d.newRecord()
} else if err != nil {
return d.newRecord(), err
}
rec.SetLastUpdated(d.clock.Now())
diff.Apply(rec)
data, err := rec.MarshalBinary()
if err != nil {
return d.newRecord(), fmt.Errorf("failed to encode record for key %v: %w", key, err)
}
err = d.store.Put(d.ctx, d.dsKey(key), data)
if err != nil {
return d.newRecord(), fmt.Errorf("storing updated record for key %v: %w", key, err)
}
d.cache.Add(key, rec)
return rec, nil
}
// prune deletes entries from the store that are older than the configured prune expiration.
// Entries that are eligible for deletion may still be present either because the prune function hasn't yet run or
// because they are still preserved in the in-memory cache after having been deleted from the database.
// Such expired entries are filtered out in getRecord
func (d *recordsBook[K, V]) prune() error {
results, err := d.store.Query(d.ctx, query.Query{
Prefix: d.dsBaseKey.String(),
})
if err != nil {
return err
}
pending := 0
batch, err := d.store.Batch(d.ctx)
if err != nil {
return err
}
for result := range results.Next() {
// Bail out if the context is done
select {
case <-d.ctx.Done():
return d.ctx.Err()
default:
}
v := d.newRecord()
if err := v.UnmarshalBinary(result.Value); err != nil {
return err
}
if d.hasExpired(v) {
if pending > maxPruneBatchSize {
if err := batch.Commit(d.ctx); err != nil {
return err
}
batch, err = d.store.Batch(d.ctx)
if err != nil {
return err
}
pending = 0
}
pending++
if err := batch.Delete(d.ctx, ds.NewKey(result.Key)); err != nil {
return err
}
}
}
if err := batch.Commit(d.ctx); err != nil {
return err
}
return nil
}
func (d *recordsBook[K, V]) hasExpired(v V) bool {
return v.LastUpdated().Add(d.recordExpiry).Before(d.clock.Now())
}
func (d *recordsBook[K, V]) Close() {
d.cancelFn()
d.bgTasks.Wait()
}