-
Notifications
You must be signed in to change notification settings - Fork 75
/
txrelay.go
183 lines (157 loc) · 4.28 KB
/
txrelay.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
//此源码被清华学神尹成大魔王专业翻译分析并修改
//尹成QQ77025077
//尹成微信18510341407
//尹成所在QQ群721929980
//尹成邮箱 yinc13@mails.tsinghua.edu.cn
//尹成毕业于清华大学,微软区块链领域全球最有价值专家
//https://mvp.microsoft.com/zh-cn/PublicProfile/4033620
//版权所有2016 Go Ethereum作者
//此文件是Go以太坊库的一部分。
//
//Go-Ethereum库是免费软件:您可以重新分发它和/或修改
//根据GNU发布的较低通用公共许可证的条款
//自由软件基金会,或者许可证的第3版,或者
//(由您选择)任何更高版本。
//
//Go以太坊图书馆的发行目的是希望它会有用,
//但没有任何保证;甚至没有
//适销性或特定用途的适用性。见
//GNU较低的通用公共许可证,了解更多详细信息。
//
//你应该收到一份GNU较低级别的公共许可证副本
//以及Go以太坊图书馆。如果没有,请参见<http://www.gnu.org/licenses/>。
package les
import (
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
type ltrInfo struct {
tx *types.Transaction
sentTo map[*peer]struct{}
}
type LesTxRelay struct {
txSent map[common.Hash]*ltrInfo
txPending map[common.Hash]struct{}
ps *peerSet
peerList []*peer
peerStartPos int
lock sync.RWMutex
reqDist *requestDistributor
}
func NewLesTxRelay(ps *peerSet, reqDist *requestDistributor) *LesTxRelay {
r := &LesTxRelay{
txSent: make(map[common.Hash]*ltrInfo),
txPending: make(map[common.Hash]struct{}),
ps: ps,
reqDist: reqDist,
}
ps.notify(r)
return r
}
func (self *LesTxRelay) registerPeer(p *peer) {
self.lock.Lock()
defer self.lock.Unlock()
self.peerList = self.ps.AllPeers()
}
func (self *LesTxRelay) unregisterPeer(p *peer) {
self.lock.Lock()
defer self.lock.Unlock()
self.peerList = self.ps.AllPeers()
}
//send最多向给定数量的对等方发送一个事务列表
//一次,不要将任何特定事务重新发送到同一个对等机两次
func (self *LesTxRelay) send(txs types.Transactions, count int) {
sendTo := make(map[*peer]types.Transactions)
self.peerStartPos++ //旋转对等列表的起始位置
if self.peerStartPos >= len(self.peerList) {
self.peerStartPos = 0
}
for _, tx := range txs {
hash := tx.Hash()
ltr, ok := self.txSent[hash]
if !ok {
ltr = <rInfo{
tx: tx,
sentTo: make(map[*peer]struct{}),
}
self.txSent[hash] = ltr
self.txPending[hash] = struct{}{}
}
if len(self.peerList) > 0 {
cnt := count
pos := self.peerStartPos
for {
peer := self.peerList[pos]
if _, ok := ltr.sentTo[peer]; !ok {
sendTo[peer] = append(sendTo[peer], tx)
ltr.sentTo[peer] = struct{}{}
cnt--
}
if cnt == 0 {
break //发送到所需的对等数
}
pos++
if pos == len(self.peerList) {
pos = 0
}
if pos == self.peerStartPos {
break //已尝试所有可用的对等机
}
}
}
}
for p, list := range sendTo {
pp := p
ll := list
reqID := genReqID()
rq := &distReq{
getCost: func(dp distPeer) uint64 {
peer := dp.(*peer)
return peer.GetRequestCost(SendTxMsg, len(ll))
},
canSend: func(dp distPeer) bool {
return dp.(*peer) == pp
},
request: func(dp distPeer) func() {
peer := dp.(*peer)
cost := peer.GetRequestCost(SendTxMsg, len(ll))
peer.fcServer.QueueRequest(reqID, cost)
return func() { peer.SendTxs(reqID, cost, ll) }
},
}
self.reqDist.queue(rq)
}
}
func (self *LesTxRelay) Send(txs types.Transactions) {
self.lock.Lock()
defer self.lock.Unlock()
self.send(txs, 3)
}
func (self *LesTxRelay) NewHead(head common.Hash, mined []common.Hash, rollback []common.Hash) {
self.lock.Lock()
defer self.lock.Unlock()
for _, hash := range mined {
delete(self.txPending, hash)
}
for _, hash := range rollback {
self.txPending[hash] = struct{}{}
}
if len(self.txPending) > 0 {
txs := make(types.Transactions, len(self.txPending))
i := 0
for hash := range self.txPending {
txs[i] = self.txSent[hash].tx
i++
}
self.send(txs, 1)
}
}
func (self *LesTxRelay) Discard(hashes []common.Hash) {
self.lock.Lock()
defer self.lock.Unlock()
for _, hash := range hashes {
delete(self.txSent, hash)
delete(self.txPending, hash)
}
}