-
Notifications
You must be signed in to change notification settings - Fork 179
/
heroQueue.go
72 lines (59 loc) · 1.91 KB
/
heroQueue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package queue
import (
"sync"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
herocache "github.com/onflow/flow-go/module/mempool/herocache/backdata"
"github.com/onflow/flow-go/module/mempool/herocache/backdata/heropool"
)
// HeroQueue implements a HeroCache-based in-memory queue.
// HeroCache is a key-value cache with zero heap allocation and optimized Garbage Collection.
type HeroQueue struct {
mu sync.RWMutex
cache *herocache.Cache
sizeLimit uint
}
func NewHeroQueue(sizeLimit uint32, logger zerolog.Logger, collector module.HeroCacheMetrics) *HeroQueue {
return &HeroQueue{
cache: herocache.NewCache(
sizeLimit,
herocache.DefaultOversizeFactor,
heropool.NoEjection,
logger.With().Str("mempool", "hero-queue").Logger(),
collector),
sizeLimit: uint(sizeLimit),
}
}
// Push stores the entity into the queue.
// Boolean returned variable determines whether push was successful, i.e.,
// push may be dropped if queue is full or already exists.
func (c *HeroQueue) Push(entity flow.Entity) bool {
c.mu.Lock()
defer c.mu.Unlock()
if c.cache.Size() >= c.sizeLimit {
// we check size before attempt on a push,
// although HeroCache is on no-ejection mode and discards pushes beyond limit,
// we save an id computation by just checking the size here.
return false
}
return c.cache.Add(entity.ID(), entity)
}
// Pop removes and returns the head of queue, and updates the head to the next element.
// Boolean return value determines whether pop is successful, i.e., popping an empty queue returns false.
func (c *HeroQueue) Pop() (flow.Entity, bool) {
c.mu.Lock()
defer c.mu.Unlock()
head, ok := c.cache.Head()
if !ok {
// cache is empty, and there is no head yet to pop.
return nil, false
}
c.cache.Remove(head.ID())
return head, true
}
func (c *HeroQueue) Size() uint {
c.mu.RLock()
defer c.mu.RUnlock()
return c.cache.Size()
}