-
Notifications
You must be signed in to change notification settings - Fork 211
/
mempool_iterator.go
208 lines (185 loc) · 5.21 KB
/
mempool_iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
package txs
import (
"container/heap"
"go.uber.org/zap"
"github.com/spacemeshos/go-spacemesh/common/types"
)
const (
MinTXGas = uint64(1) // gas required for the most basic transaction
)
type item struct {
*NanoTX
// The index is needed by update and is maintained by the heap.Interface methods.
index int // The index of the item in the heap.
}
type priorityQueue []*item
// Len implements head.Interface.
func (pq priorityQueue) Len() int { return len(pq) }
// Less implements head.Interface.
func (pq priorityQueue) Less(i, j int) bool {
// We want Pop to give us the highest, not lowest, fee, so we use greater than here.
if pq[i].Fee() != pq[j].Fee() {
return pq[i].Fee() > pq[j].Fee()
}
// if fees are equal, we want the older tx first
if !pq[i].Received.Equal(pq[j].Received) {
return pq[i].Received.Before(pq[j].Received)
}
// if fees and timestamps are equal, we want the tx with the lower ID first
return pq[i].ID.Compare(pq[j].ID)
}
// Swap implements head.Interface.
func (pq priorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
// Push implements head.Interface.
func (pq *priorityQueue) Push(i any) {
n := len(*pq)
it := i.(*item)
it.index = n
*pq = append(*pq, it)
}
// Pop implements head.Interface.
func (pq *priorityQueue) Pop() any {
old := *pq
n := len(old)
it := old[n-1]
old[n-1] = nil // avoid memory leak
it.index = -1 // for safety
*pq = old[0 : n-1]
return it
}
// update modifies the fee and value of an item in the queue.
func (pq *priorityQueue) update(it *item, ntx *NanoTX) {
it.NanoTX = ntx
heap.Fix(pq, it.index)
}
// mempoolIterator holds the best transaction from the conservative state mempool.
// not thread-safe.
type mempoolIterator struct {
logger *zap.Logger
gasRemaining uint64
pq priorityQueue
txs map[types.Address][]*NanoTX
}
// newMempoolIterator builds and returns a mempoolIterator.
func newMempoolIterator(logger *zap.Logger, cs conStateCache, gasLimit uint64) *mempoolIterator {
txs := cs.GetMempool(logger)
mi := &mempoolIterator{
logger: logger,
gasRemaining: gasLimit,
pq: make(priorityQueue, 0, len(txs)),
txs: txs,
}
logger.Info("received mempool txs", zap.Int("num_accounts", len(txs)))
mi.buildPQ()
return mi
}
func (mi *mempoolIterator) buildPQ() {
i := 0
for addr, ntxs := range mi.txs {
ntx := ntxs[0]
it := &item{
NanoTX: ntx,
index: i,
}
i++
mi.pq = append(mi.pq, it)
mi.logger.Debug("adding item to pq",
zap.Stringer("tx_id", ntx.ID),
zap.Stringer("address", ntx.Principal),
zap.Uint64("fee", ntx.Fee()),
zap.Uint64("gas", ntx.MaxGas),
zap.Time("received", ntx.Received))
if len(ntxs) == 1 {
delete(mi.txs, addr)
} else {
mi.txs[addr] = ntxs[1:]
}
}
heap.Init(&mi.pq)
}
func (mi *mempoolIterator) getNext(addr types.Address) *NanoTX {
if _, ok := mi.txs[addr]; !ok {
return nil
}
ntx := mi.txs[addr][0]
if len(mi.txs[addr]) == 1 {
delete(mi.txs, addr)
} else {
mi.txs[addr] = mi.txs[addr][1:]
}
return ntx
}
func (mi *mempoolIterator) pop() *NanoTX {
if mi.pq.Len() == 0 || mi.gasRemaining < MinTXGas {
return nil
}
var top *item
for mi.pq.Len() > 0 {
// the first item in priority queue is always the item to be popped with the heap
top = mi.pq[0]
if top.MaxGas <= mi.gasRemaining {
break
}
mi.logger.Debug("tx max gas too high, removing addr from mempool",
zap.Stringer("tx_id", top.ID),
zap.Stringer("address", top.Principal),
zap.Uint64("fee", top.Fee()),
zap.Uint64("gas", top.MaxGas),
zap.Uint64("gas_left", mi.gasRemaining),
zap.Time("received", top.Received),
)
_ = heap.Pop(&mi.pq)
// remove all txs for this principal since we cannot fulfill the lowest nonce for this principal
delete(mi.txs, top.Principal)
top = nil
}
if top == nil {
return nil
}
ntx := top.NanoTX
mi.gasRemaining -= ntx.MaxGas
mi.logger.Debug("popping tx",
zap.Stringer("tx_id", ntx.ID),
zap.Stringer("address", ntx.Principal),
zap.Uint64("fee", ntx.Fee()),
zap.Uint64("gas_used", ntx.MaxGas),
zap.Uint64("gas_left", mi.gasRemaining),
zap.Time("received", ntx.Received))
next := mi.getNext(ntx.Principal)
if next == nil {
mi.logger.Debug("addr txs exhausted", zap.Stringer("address", ntx.Principal))
_ = heap.Pop(&mi.pq)
} else {
mi.logger.Debug("added tx for addr",
zap.Stringer("tx_id", next.ID),
zap.Stringer("address", next.Principal),
zap.Uint64("fee", next.Fee()),
zap.Uint64("gas", next.MaxGas),
zap.Time("received", next.Received))
// updating the item (for the same address) in the heap is less expensive than a pop followed by a push.
mi.pq.update(top, next)
}
return ntx
}
// PopAll returns all the transaction in the mempoolIterator.
func (mi *mempoolIterator) PopAll() ([]*NanoTX, map[types.Address][]*NanoTX) {
result := make([]*NanoTX, 0)
byAddrAndNonce := make(map[types.Address][]*NanoTX)
for {
popped := mi.pop()
if popped == nil {
break
}
result = append(result, popped)
principal := popped.Principal
if _, ok := byAddrAndNonce[principal]; !ok {
byAddrAndNonce[principal] = make([]*NanoTX, 0, maxTXsPerAcct)
}
byAddrAndNonce[principal] = append(byAddrAndNonce[principal], popped)
}
return result, byAddrAndNonce
}