/
txqueue.go
127 lines (104 loc) · 2.68 KB
/
txqueue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package mempool
import (
"crypto/sha256"
"sync"
"github.com/okx/okbchain/libs/tendermint/libs/clist"
"github.com/okx/okbchain/libs/tendermint/types"
)
type ITransactionQueue interface {
Len() int
Insert(tx *mempoolTx) error
Remove(element *clist.CElement)
RemoveByKey(key [sha256.Size]byte) *clist.CElement
Front() *clist.CElement
Back() *clist.CElement
BroadcastFront() *clist.CElement
BroadcastLen() int
Load(hash [sha256.Size]byte) (*clist.CElement, bool)
TxsWaitChan() <-chan struct{}
AddressRecorder
}
type AddressRecorder interface {
GetAddressList() []string
GetAddressNonce(address string) (uint64, bool)
GetAddressTxsCnt(address string) int
GetAddressTxs(address string, max int) types.Txs
CleanItems(address string, nonce uint64)
}
type BaseTxQueue struct {
txs *clist.CList // FIFO list
txsMap sync.Map //txKey -> CElement
*AddressRecord
}
func NewBaseTxQueue() *BaseTxQueue {
return &BaseTxQueue{
txs: clist.New(),
AddressRecord: newAddressRecord(nil),
}
}
func (q *BaseTxQueue) Len() int {
return q.txs.Len()
}
func (q *BaseTxQueue) Insert(tx *mempoolTx) error {
/*
1. insert tx list
2. insert address record
3. insert tx map
*/
ele := q.txs.PushBack(tx)
q.AddressRecord.AddItem(ele.Address, ele)
q.txsMap.Store(txKey(ele.Value.(*mempoolTx).tx), ele)
return nil
}
func (q *BaseTxQueue) Remove(element *clist.CElement) {
q.removeElement(element)
q.AddressRecord.DeleteItem(element)
}
func (q *BaseTxQueue) RemoveByKey(key [32]byte) (ele *clist.CElement) {
ele = q.removeElementByKey(key)
if ele != nil {
q.AddressRecord.DeleteItem(ele)
}
return
}
func (q *BaseTxQueue) Front() *clist.CElement {
return q.txs.Front()
}
func (q *BaseTxQueue) Back() *clist.CElement {
return q.txs.Back()
}
func (q *BaseTxQueue) BroadcastFront() *clist.CElement {
return q.txs.Front()
}
func (q *BaseTxQueue) BroadcastLen() int {
return q.txs.Len()
}
func (q *BaseTxQueue) TxsWaitChan() <-chan struct{} {
return q.txs.WaitChan()
}
func (q *BaseTxQueue) Load(hash [sha256.Size]byte) (*clist.CElement, bool) {
v, ok := q.txsMap.Load(hash)
if !ok {
return nil, false
}
return v.(*clist.CElement), true
}
func (q *BaseTxQueue) removeElement(element *clist.CElement) {
q.txs.Remove(element)
element.DetachPrev()
tx := element.Value.(*mempoolTx).tx
txHash := txKey(tx)
q.txsMap.Delete(txHash)
}
func (q *BaseTxQueue) removeElementByKey(key [32]byte) *clist.CElement {
if v, ok := q.txsMap.LoadAndDelete(key); ok {
element := v.(*clist.CElement)
q.txs.Remove(element)
element.DetachPrev()
return element
}
return nil
}
func (q *BaseTxQueue) CleanItems(address string, nonce uint64) {
q.AddressRecord.CleanItems(address, nonce, q.removeElement)
}