-
Notifications
You must be signed in to change notification settings - Fork 211
/
tx_pool.go
156 lines (140 loc) · 4.66 KB
/
tx_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package miner
import (
"errors"
"fmt"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/pendingtxs"
"github.com/spacemeshos/go-spacemesh/rand"
"sync"
)
// TxMempool is a struct that holds txs received via gossip network
type TxMempool struct {
txs map[types.TransactionID]*types.Transaction
accounts map[types.Address]*pendingtxs.AccountPendingTxs
txByAddr map[types.Address]map[types.TransactionID]struct{}
mu sync.RWMutex
}
// NewTxMemPool returns a new TxMempool struct
func NewTxMemPool() *TxMempool {
return &TxMempool{
txs: make(map[types.TransactionID]*types.Transaction),
accounts: make(map[types.Address]*pendingtxs.AccountPendingTxs),
txByAddr: make(map[types.Address]map[types.TransactionID]struct{}),
}
}
// Get returns transaction by provided id, it returns an error if transaction is not found
func (t *TxMempool) Get(id types.TransactionID) (*types.Transaction, error) {
t.mu.RLock()
defer t.mu.RUnlock()
if tx, found := t.txs[id]; found {
return tx, nil
}
return nil, errors.New("transaction not found in mempool")
}
// GetTxIdsByAddress returns all transactions from/to a specific address
func (t *TxMempool) GetTxIdsByAddress(addr types.Address) []types.TransactionID {
var ids []types.TransactionID
for id := range t.txByAddr[addr] {
ids = append(ids, id)
}
return ids
}
// GetTxsForBlock gets a specific number of random txs for a block. This function also receives a state calculation function
// to allow returning only transactions that will probably be valid
func (t *TxMempool) GetTxsForBlock(numOfTxs int, getState func(addr types.Address) (nonce, balance uint64, err error)) ([]types.TransactionID, error) {
var txIds []types.TransactionID
t.mu.RLock()
for addr, account := range t.accounts {
nonce, balance, err := getState(addr)
if err != nil {
t.mu.RUnlock()
return nil, fmt.Errorf("failed to get state for addr %s: %v", addr.Short(), err)
}
accountTxIds, _, _ := account.ValidTxs(nonce, balance)
txIds = append(txIds, accountTxIds...)
}
t.mu.RUnlock()
if len(txIds) <= numOfTxs {
return txIds, nil
}
var ret []types.TransactionID
for idx := range getRandIdxs(numOfTxs, len(txIds)) {
//noinspection GoNilness
ret = append(ret, txIds[idx])
}
return ret, nil
}
func getRandIdxs(numOfTxs, spaceSize int) map[uint64]struct{} {
idxs := make(map[uint64]struct{})
for len(idxs) < numOfTxs {
rndInt := rand.Uint64()
idx := rndInt % uint64(spaceSize)
idxs[idx] = struct{}{}
}
return idxs
}
// Put inserts a transaction into the mem pool. It indexes it by source and dest addresses as well
func (t *TxMempool) Put(id types.TransactionID, tx *types.Transaction) {
t.mu.Lock()
t.txs[id] = tx
t.getOrCreate(tx.Origin()).Add(0, tx)
t.addToAddr(tx.Origin(), id)
t.addToAddr(tx.Recipient, id)
t.mu.Unlock()
}
// Invalidate removes transaction from pool
func (t *TxMempool) Invalidate(id types.TransactionID) {
t.mu.Lock()
if tx, found := t.txs[id]; found {
if pendingTxs, found := t.accounts[tx.Origin()]; found {
// Once a tx appears in a block we want to invalidate all of this nonce's variants. The mempool currently
// only accepts one version, but this future-proofs it.
pendingTxs.RemoveNonce(tx.AccountNonce, func(id types.TransactionID) {
delete(t.txs, id)
})
if pendingTxs.IsEmpty() {
delete(t.accounts, tx.Origin())
}
}
t.removeFromAddr(tx.Origin(), id)
t.removeFromAddr(tx.Recipient, id)
}
t.mu.Unlock()
}
// GetProjection returns the estimated nonce and balance for the provided address addr and previous nonce and balance
// projecting state is done by applying transactions from the pool
func (t *TxMempool) GetProjection(addr types.Address, prevNonce, prevBalance uint64) (nonce, balance uint64) {
t.mu.RLock()
account, found := t.accounts[addr]
t.mu.RUnlock()
if !found {
return prevNonce, prevBalance
}
return account.GetProjection(prevNonce, prevBalance)
}
// ⚠️ must be called under write-lock
func (t *TxMempool) getOrCreate(addr types.Address) *pendingtxs.AccountPendingTxs {
account, found := t.accounts[addr]
if !found {
account = pendingtxs.NewAccountPendingTxs()
t.accounts[addr] = account
}
return account
}
// ⚠️ must be called under write-lock
func (t *TxMempool) addToAddr(addr types.Address, txID types.TransactionID) {
addrMap, found := t.txByAddr[addr]
if !found {
addrMap = make(map[types.TransactionID]struct{})
t.txByAddr[addr] = addrMap
}
addrMap[txID] = struct{}{}
}
// ⚠️ must be called under write-lock
func (t *TxMempool) removeFromAddr(addr types.Address, txID types.TransactionID) {
addrMap := t.txByAddr[addr]
delete(addrMap, txID)
if len(addrMap) == 0 {
delete(t.txByAddr, addr)
}
}