-
Notifications
You must be signed in to change notification settings - Fork 179
/
approvals_lru_cache.go
96 lines (82 loc) · 2.61 KB
/
approvals_lru_cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package approvals
import (
"sync"
"github.com/hashicorp/golang-lru/v2/simplelru"
"github.com/onflow/flow-go/model/flow"
)
// LruCache is a wrapper over `simplelru.LRUCache` that provides needed api for processing result approvals
// Extends functionality of `simplelru.LRUCache` by introducing additional index for quicker access.
type LruCache struct {
lru simplelru.LRUCache[flow.Identifier, *flow.ResultApproval]
lock sync.RWMutex
// secondary index by result id, since multiple approvals could
// reference same result
byResultID map[flow.Identifier]map[flow.Identifier]struct{}
}
func NewApprovalsLRUCache(limit uint) *LruCache {
byResultID := make(map[flow.Identifier]map[flow.Identifier]struct{})
// callback has to be called while we are holding lock
lru, _ := simplelru.NewLRU(int(limit), func(key flow.Identifier, approval *flow.ResultApproval) {
delete(byResultID[approval.Body.ExecutionResultID], approval.Body.PartialID())
if len(byResultID[approval.Body.ExecutionResultID]) == 0 {
delete(byResultID, approval.Body.ExecutionResultID)
}
})
return &LruCache{
lru: lru,
byResultID: byResultID,
}
}
func (c *LruCache) Peek(approvalID flow.Identifier) *flow.ResultApproval {
c.lock.RLock()
defer c.lock.RUnlock()
// check if we have it in the cache
resource, cached := c.lru.Peek(approvalID)
if cached {
return resource
}
return nil
}
func (c *LruCache) Get(approvalID flow.Identifier) *flow.ResultApproval {
c.lock.Lock()
defer c.lock.Unlock()
// check if we have it in the cache
resource, cached := c.lru.Get(approvalID)
if cached {
return resource
}
return nil
}
func (c *LruCache) TakeByResultID(resultID flow.Identifier) []*flow.ResultApproval {
c.lock.Lock()
defer c.lock.Unlock()
ids, ok := c.byResultID[resultID]
if !ok {
return nil
}
approvals := make([]*flow.ResultApproval, 0, len(ids))
for approvalID := range ids {
// check if we have it in the cache
if resource, ok := c.lru.Peek(approvalID); ok {
// no need to cleanup secondary index since it will be
// cleaned up in evict callback
_ = c.lru.Remove(approvalID)
approvals = append(approvals, resource)
}
}
return approvals
}
func (c *LruCache) Put(approval *flow.ResultApproval) {
approvalID := approval.Body.PartialID()
resultID := approval.Body.ExecutionResultID
c.lock.Lock()
defer c.lock.Unlock()
// cache the resource and eject least recently used one if we reached limit
_ = c.lru.Add(approvalID, approval)
_, ok := c.byResultID[resultID]
if !ok {
c.byResultID[resultID] = map[flow.Identifier]struct{}{approvalID: {}}
} else {
c.byResultID[resultID][approvalID] = struct{}{}
}
}