/
cache.go
451 lines (391 loc) · 10.7 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
package mkvs
import (
"container/list"
"context"
"errors"
"fmt"
"sync"
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
db "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/node"
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/syncer"
)
var errRemoveLocked = errors.New("mkvs: tried to remove locked pointer")
// cache handles the in-memory tree cache.
type cache struct {
sync.Mutex
syncer.ProofVerifier
syncer.SubtreeMerger
db db.NodeDB
rs syncer.ReadSyncer
// pendingRoot is the pending root which will become the new root if
// the currently cached contents is committed.
pendingRoot *node.Pointer
// syncRoot is the root at which all node database and syncer cache
// lookups will be done.
syncRoot node.Root
// Current size of leaf values.
valueSize uint64
// Current number of internal nodes.
internalNodeCount uint64
// Maximum capacity of internal nodes.
nodeCapacity uint64
// Maximum capacity of leaf values.
valueCapacity uint64
lruInternal *list.List
lruInternalPos *list.Element
lruLeaf *list.List
lruLeafPos *list.Element
}
// MaxPrefetchDepth is the maximum depth of the prefeteched tree.
const MaxPrefetchDepth = 255
func newCache(ndb db.NodeDB, rs syncer.ReadSyncer, rootType node.RootType) *cache {
c := &cache{
db: ndb,
rs: rs,
lruInternal: list.New(),
lruLeaf: list.New(),
valueCapacity: 16 * 1024 * 1024,
nodeCapacity: 5000,
}
// By default the sync root is an empty root.
c.syncRoot.Empty()
c.syncRoot.Type = rootType
return c
}
func (c *cache) close() {
// Clear references.
c.db = nil
c.rs = nil
c.pendingRoot = nil
c.lruInternal = nil
c.lruInternalPos = nil
c.lruLeaf = nil
c.lruLeafPos = nil
// Reset sync root.
c.syncRoot = node.Root{}
// Reset statistics.
c.valueSize = 0
c.internalNodeCount = 0
}
func (c *cache) isClosed() bool {
return c.db == nil
}
func (c *cache) getSyncRoot() node.Root {
return c.syncRoot
}
func (c *cache) setSyncRoot(root node.Root) {
c.syncRoot = root
}
func (c *cache) setPendingRoot(ptr *node.Pointer) {
c.pendingRoot = ptr
}
func (c *cache) newLeafNodePtr(n *node.LeafNode) *node.Pointer {
return &node.Pointer{
Node: n,
}
}
func (c *cache) newLeafNode(key node.Key, val []byte) *node.Pointer {
return c.newLeafNodePtr(&node.LeafNode{
Key: key[:],
Value: val,
})
}
func (c *cache) newInternalNodePtr(n *node.InternalNode) *node.Pointer {
return &node.Pointer{
Node: n,
}
}
func (c *cache) newInternalNode(label node.Key, labelBitLength node.Depth, leafNode, left, right *node.Pointer) *node.Pointer {
return c.newInternalNodePtr(&node.InternalNode{
Label: label,
LabelBitLength: labelBitLength,
LeafNode: leafNode,
Left: left,
Right: right,
})
}
// useNode moves the node to the front of the LRU list.
func (c *cache) useNode(ptr *node.Pointer) {
if ptr.LRU == nil {
return
}
switch ptr.Node.(type) {
case *node.InternalNode:
c.lruInternal.MoveToFront(ptr.LRU)
case *node.LeafNode:
c.lruLeaf.MoveToFront(ptr.LRU)
}
}
// markPosition marks the current LRU queue positions as the ones before
// any nodes are visited. Any new nodes committed into the cache after
// this is called will be inserted after the marked position.
//
// This makes it possible to keep the path from the root to the derefed
// node in the cache instead of evicting it.
func (c *cache) markPosition() {
c.lruInternalPos = c.lruInternal.Front()
c.lruLeafPos = c.lruLeaf.Front()
}
func (c *cache) tryCommitNode(ptr, lockedPtr *node.Pointer) error {
if !ptr.IsClean() {
panic("mkvs: commitNode called on dirty node")
}
if ptr == nil || ptr.Node == nil {
return nil
}
if ptr.LRU != nil {
c.useNode(ptr)
return nil
}
// Evict nodes till there is enough capacity.
switch n := ptr.Node.(type) {
case *node.InternalNode:
if c.nodeCapacity > 0 && c.internalNodeCount+1 > c.nodeCapacity {
if err := c.tryEvictInternal(1, lockedPtr); err != nil {
return err
}
}
if c.lruInternalPos != nil {
ptr.LRU = c.lruInternal.InsertAfter(ptr, c.lruInternalPos)
} else {
ptr.LRU = c.lruInternal.PushFront(ptr)
}
c.internalNodeCount++
case *node.LeafNode:
valueSize := n.Size()
if c.valueCapacity > 0 && c.valueSize+valueSize > c.valueCapacity {
if err := c.tryEvictLeaf(valueSize, lockedPtr); err != nil {
return err
}
}
if c.lruLeafPos != nil {
ptr.LRU = c.lruLeaf.InsertAfter(ptr, c.lruLeafPos)
} else {
ptr.LRU = c.lruLeaf.PushFront(ptr)
}
c.valueSize += valueSize
}
return nil
}
// commitNode makes the node eligible for eviction.
func (c *cache) commitNode(ptr *node.Pointer) {
_ = c.tryCommitNode(ptr, nil)
}
// rollbackNode marks a tree node as no longer being eligible for
// eviction due to it becoming dirty.
func (c *cache) rollbackNode(ptr *node.Pointer) {
if ptr.LRU == nil {
// Node has not yet been committed to cache.
return
}
switch n := ptr.Node.(type) {
case *node.InternalNode:
if c.lruInternalPos == ptr.LRU {
c.lruInternalPos = nil
}
c.lruInternal.Remove(ptr.LRU)
c.internalNodeCount--
case *node.LeafNode:
if c.lruLeafPos == ptr.LRU {
c.lruLeafPos = nil
}
c.lruLeaf.Remove(ptr.LRU)
c.valueSize -= n.Size()
}
ptr.LRU = nil
}
func (c *cache) tryRemoveNode(ptr, lockedPtr *node.Pointer) error {
if lockedPtr != nil && lockedPtr == ptr {
return errRemoveLocked
}
if ptr.LRU == nil {
// Node has not yet been committed to cache.
return nil
}
switch n := ptr.Node.(type) {
case *node.InternalNode:
// Remove leaf node and subtrees first.
if n.LeafNode != nil && n.LeafNode.Node != nil {
if err := c.tryRemoveNode(n.LeafNode, lockedPtr); err != nil {
return err
}
n.LeafNode = nil
}
if n.Left != nil && n.Left.Node != nil {
if err := c.tryRemoveNode(n.Left, lockedPtr); err != nil {
return err
}
n.Left = nil
}
if n.Right != nil && n.Right.Node != nil {
if err := c.tryRemoveNode(n.Right, lockedPtr); err != nil {
return err
}
n.Right = nil
}
if c.lruInternalPos == ptr.LRU {
c.lruInternalPos = nil
}
c.lruInternal.Remove(ptr.LRU)
c.internalNodeCount--
case *node.LeafNode:
if c.lruLeafPos == ptr.LRU {
c.lruLeafPos = nil
}
c.lruLeaf.Remove(ptr.LRU)
c.valueSize -= n.Size()
}
ptr.Node = nil
ptr.LRU = nil
return nil
}
// removeNode removes a tree node.
func (c *cache) removeNode(ptr *node.Pointer) {
_ = c.tryRemoveNode(ptr, nil)
}
// tryEvictLeaf tries to evict leaf nodes from the cache.
func (c *cache) tryEvictLeaf(targetCapacity uint64, lockedPtr *node.Pointer) error {
for c.lruLeaf.Len() > 0 && c.valueSize+targetCapacity > c.valueCapacity {
elem := c.lruLeaf.Back()
n := elem.Value.(*node.Pointer)
if !n.Clean {
panic(fmt.Errorf("mkvs: tried to evict dirty node %v", n))
}
if err := c.tryRemoveNode(n, lockedPtr); err != nil {
return err
}
}
return nil
}
// tryEvictInternal tries to evict internal nodes from the cache.
func (c *cache) tryEvictInternal(targetCapacity uint64, lockedPtr *node.Pointer) error {
for c.lruInternal.Len() > 0 && c.internalNodeCount+targetCapacity > c.nodeCapacity {
elem := c.lruInternal.Back()
n := elem.Value.(*node.Pointer)
if !n.Clean {
panic(fmt.Errorf("mkvs: tried to evict dirty node %v", n))
}
if err := c.tryRemoveNode(n, lockedPtr); err != nil {
return err
}
}
return nil
}
// readSyncFetcher is a function that is used to fetch proofs from a remote
// tree via the ReadSyncer interface.
type readSyncFetcher func(context.Context, *node.Pointer, syncer.ReadSyncer) (*syncer.Proof, error)
// derefNodePtr dereferences an internal node pointer.
//
// This may result in node database accesses or remote syncing if the node
// is not available locally.
func (c *cache) derefNodePtr(
ctx context.Context,
ptr *node.Pointer,
fetcher readSyncFetcher,
) (node.Node, error) {
if ptr == nil {
return nil, nil
}
c.useNode(ptr)
if ptr.Node != nil {
var refetch bool
switch n := ptr.Node.(type) {
case *node.InternalNode:
// If this is an internal node, check if the leaf node has been evicted.
// In this case treat it as if we need to re-fetch the node.
if n.LeafNode != nil && n.LeafNode.Node == nil {
c.removeNode(ptr)
refetch = true
}
}
if !refetch {
return ptr.Node, nil
}
}
if !ptr.Clean || ptr.Hash.IsEmpty() {
return nil, nil
}
// First, attempt to fetch from the local node database.
n, err := c.db.GetNode(c.syncRoot, ptr)
switch err {
case nil:
ptr.Node = n
// Commit node to cache.
c.commitNode(ptr)
case db.ErrNodeNotFound:
// Node not found in local node database, try the syncer if available.
if c.rs == syncer.NopReadSyncer {
return nil, err
}
if err = c.remoteSync(ctx, ptr, fetcher); err != nil {
return nil, err
}
if ptr.Node == nil {
return nil, fmt.Errorf("mkvs: received result did not contain node (or cache too small)")
}
default:
return nil, err
}
return ptr.Node, nil
}
// remoteSync performs a remote sync with the configured remote syncer.
func (c *cache) remoteSync(ctx context.Context, ptr *node.Pointer, fetcher readSyncFetcher) error {
proof, err := fetcher(ctx, ptr, c.rs)
if err != nil {
return err
}
// The proof can be for one of two hashes: i) it is either for ptr.Hash in case
// all the nodes are only contained in the subtree below ptr, or ii) it is for
// the c.syncRoot.Hash in case it contains nodes outside the subtree.
var dstPtr *node.Pointer
var expectedRoot hash.Hash
switch {
case proof.UntrustedRoot.Equal(&ptr.Hash):
dstPtr = ptr
expectedRoot = ptr.Hash
case proof.UntrustedRoot.Equal(&c.syncRoot.Hash):
dstPtr = c.pendingRoot
expectedRoot = c.syncRoot.Hash
default:
// Proof is for an unknown root.
return fmt.Errorf("mkvs: got proof for unexpected root (%s)", proof.UntrustedRoot)
}
// Verify proof.
subtree, err := c.VerifyProof(ctx, expectedRoot, proof)
if err != nil {
return err
}
// Merge resulting nodes.
var commitNode func(*node.Pointer) error
commitNode = func(p *node.Pointer) error {
if p == nil || p.Node == nil {
return nil
}
// Try to commit the node. If we fail this means that there is not enough
// space in the cache to keep the node that we are trying to dereference.
if err := c.tryCommitNode(p, ptr); err != nil {
// Failed to commit, make sure to not keep the subtree in memory.
p.Node = nil
return err
}
// Commit all children.
if n, ok := p.Node.(*node.InternalNode); ok {
if err := commitNode(n.Left); err != nil {
return err
}
if err := commitNode(n.Right); err != nil {
return err
}
}
return nil
}
if err := c.MergeVerifiedSubtree(ctx, dstPtr, subtree, commitNode); err != nil {
if err == errRemoveLocked {
// Cache is too small, ignore.
return nil
}
return err
}
return nil
}