/
peer.go
256 lines (219 loc) · 6.54 KB
/
peer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
//<developer>
// <name>linapex 曹一峰</name>
// <email>linapex@163.com</email>
// <wx>superexc</wx>
// <qqgroup>128148617</qqgroup>
// <url>https://jsq.ink</url>
// <role>pku engineer</role>
// <date>2019-03-16 19:16:46</date>
//</624450125648891904>
package whisperv6
import (
"fmt"
"math"
"sync"
"time"
mapset "github.com/deckarep/golang-set"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
)
//Peer表示一个耳语协议对等连接。
type Peer struct {
host *Whisper
peer *p2p.Peer
ws p2p.MsgReadWriter
trusted bool
powRequirement float64
bloomMu sync.Mutex
bloomFilter []byte
fullNode bool
known mapset.Set //对等方已经知道的消息,以避免浪费带宽
quit chan struct{}
}
//new peer创建一个新的whisper peer对象,但不运行握手本身。
func newPeer(host *Whisper, remote *p2p.Peer, rw p2p.MsgReadWriter) *Peer {
return &Peer{
host: host,
peer: remote,
ws: rw,
trusted: false,
powRequirement: 0.0,
known: mapset.NewSet(),
quit: make(chan struct{}),
bloomFilter: MakeFullNodeBloom(),
fullNode: true,
}
}
//启动启动对等更新程序,定期广播低语数据包
//进入网络。
func (peer *Peer) start() {
go peer.update()
log.Trace("start", "peer", peer.ID())
}
//stop终止对等更新程序,停止向其转发消息。
func (peer *Peer) stop() {
close(peer.quit)
log.Trace("stop", "peer", peer.ID())
}
//握手向远程对等端发送协议启动状态消息,并且
//也验证远程状态。
func (peer *Peer) handshake() error {
//异步发送握手状态消息
errc := make(chan error, 1)
isLightNode := peer.host.LightClientMode()
isRestrictedLightNodeConnection := peer.host.LightClientModeConnectionRestricted()
go func() {
pow := peer.host.MinPow()
powConverted := math.Float64bits(pow)
bloom := peer.host.BloomFilter()
errc <- p2p.SendItems(peer.ws, statusCode, ProtocolVersion, powConverted, bloom, isLightNode)
}()
//获取远程状态包并验证协议匹配
packet, err := peer.ws.ReadMsg()
if err != nil {
return err
}
if packet.Code != statusCode {
return fmt.Errorf("peer [%x] sent packet %x before status packet", peer.ID(), packet.Code)
}
s := rlp.NewStream(packet.Payload, uint64(packet.Size))
_, err = s.List()
if err != nil {
return fmt.Errorf("peer [%x] sent bad status message: %v", peer.ID(), err)
}
peerVersion, err := s.Uint()
if err != nil {
return fmt.Errorf("peer [%x] sent bad status message (unable to decode version): %v", peer.ID(), err)
}
if peerVersion != ProtocolVersion {
return fmt.Errorf("peer [%x]: protocol version mismatch %d != %d", peer.ID(), peerVersion, ProtocolVersion)
}
//只有版本是必需的,后续参数是可选的
powRaw, err := s.Uint()
if err == nil {
pow := math.Float64frombits(powRaw)
if math.IsInf(pow, 0) || math.IsNaN(pow) || pow < 0.0 {
return fmt.Errorf("peer [%x] sent bad status message: invalid pow", peer.ID())
}
peer.powRequirement = pow
var bloom []byte
err = s.Decode(&bloom)
if err == nil {
sz := len(bloom)
if sz != BloomFilterSize && sz != 0 {
return fmt.Errorf("peer [%x] sent bad status message: wrong bloom filter size %d", peer.ID(), sz)
}
peer.setBloomFilter(bloom)
}
}
isRemotePeerLightNode, err := s.Bool()
if isRemotePeerLightNode && isLightNode && isRestrictedLightNodeConnection {
return fmt.Errorf("peer [%x] is useless: two light client communication restricted", peer.ID())
}
if err := <-errc; err != nil {
return fmt.Errorf("peer [%x] failed to send status packet: %v", peer.ID(), err)
}
return nil
}
//更新在对等机上执行定期操作,包括消息传输
//和呼气。
func (peer *Peer) update() {
//启动更新的滚动条
expire := time.NewTicker(expirationCycle)
transmit := time.NewTicker(transmissionCycle)
//循环并发送直到请求终止
for {
select {
case <-expire.C:
peer.expire()
case <-transmit.C:
if err := peer.broadcast(); err != nil {
log.Trace("broadcast failed", "reason", err, "peer", peer.ID())
return
}
case <-peer.quit:
return
}
}
}
//马克标记了一个同伴知道的信封,这样它就不会被送回。
func (peer *Peer) mark(envelope *Envelope) {
peer.known.Add(envelope.Hash())
}
//标记检查远程对等机是否已经知道信封。
func (peer *Peer) marked(envelope *Envelope) bool {
return peer.known.Contains(envelope.Hash())
}
//Expire迭代主机中的所有已知信封,并删除所有
//已知列表中过期(未知)的。
func (peer *Peer) expire() {
unmark := make(map[common.Hash]struct{})
peer.known.Each(func(v interface{}) bool {
if !peer.host.isEnvelopeCached(v.(common.Hash)) {
unmark[v.(common.Hash)] = struct{}{}
}
return true
})
//转储所有已知但不再缓存的内容
for hash := range unmark {
peer.known.Remove(hash)
}
}
//广播在信封集合上迭代,传输未知信息
//在网络上。
func (peer *Peer) broadcast() error {
envelopes := peer.host.Envelopes()
bundle := make([]*Envelope, 0, len(envelopes))
for _, envelope := range envelopes {
if !peer.marked(envelope) && envelope.PoW() >= peer.powRequirement && peer.bloomMatch(envelope) {
bundle = append(bundle, envelope)
}
}
if len(bundle) > 0 {
//发送一批信封
if err := p2p.Send(peer.ws, messagesCode, bundle); err != nil {
return err
}
//仅当信封成功发送时才标记信封
for _, e := range bundle {
peer.mark(e)
}
log.Trace("broadcast", "num. messages", len(bundle))
}
return nil
}
//ID返回对等方的ID
func (peer *Peer) ID() []byte {
id := peer.peer.ID()
return id[:]
}
func (peer *Peer) notifyAboutPowRequirementChange(pow float64) error {
i := math.Float64bits(pow)
return p2p.Send(peer.ws, powRequirementCode, i)
}
func (peer *Peer) notifyAboutBloomFilterChange(bloom []byte) error {
return p2p.Send(peer.ws, bloomFilterExCode, bloom)
}
func (peer *Peer) bloomMatch(env *Envelope) bool {
peer.bloomMu.Lock()
defer peer.bloomMu.Unlock()
return peer.fullNode || BloomFilterMatch(peer.bloomFilter, env.Bloom())
}
func (peer *Peer) setBloomFilter(bloom []byte) {
peer.bloomMu.Lock()
defer peer.bloomMu.Unlock()
peer.bloomFilter = bloom
peer.fullNode = isFullNode(bloom)
if peer.fullNode && peer.bloomFilter == nil {
peer.bloomFilter = MakeFullNodeBloom()
}
}
func MakeFullNodeBloom() []byte {
bloom := make([]byte, BloomFilterSize)
for i := 0; i < BloomFilterSize; i++ {
bloom[i] = 0xFF
}
return bloom
}