-
Notifications
You must be signed in to change notification settings - Fork 75
/
retrieve.go
411 lines (365 loc) · 10.4 KB
/
retrieve.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
//此源码被清华学神尹成大魔王专业翻译分析并修改
//尹成QQ77025077
//尹成微信18510341407
//尹成所在QQ群721929980
//尹成邮箱 yinc13@mails.tsinghua.edu.cn
//尹成毕业于清华大学,微软区块链领域全球最有价值专家
//https://mvp.microsoft.com/zh-cn/PublicProfile/4033620
//版权所有2017 Go Ethereum作者
//此文件是Go以太坊库的一部分。
//
//Go-Ethereum库是免费软件:您可以重新分发它和/或修改
//根据GNU发布的较低通用公共许可证的条款
//自由软件基金会,或者许可证的第3版,或者
//(由您选择)任何更高版本。
//
//Go以太坊图书馆的发行目的是希望它会有用,
//但没有任何保证;甚至没有
//适销性或特定用途的适用性。见
//GNU较低的通用公共许可证,了解更多详细信息。
//
//你应该收到一份GNU较低级别的公共许可证副本
//以及Go以太坊图书馆。如果没有,请参见<http://www.gnu.org/licenses/>。
//package light实现可按需检索的状态和链对象
//对于以太坊Light客户端。
package les
import (
"context"
"crypto/rand"
"encoding/binary"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/light"
)
var (
retryQueue = time.Millisecond * 100
softRequestTimeout = time.Millisecond * 500
hardRequestTimeout = time.Second * 10
)
//RetrieveManager是一个位于RequestDistributor之上的层,负责
//按请求ID匹配答复,并处理超时,必要时重新发送。
type retrieveManager struct {
dist *requestDistributor
peers *peerSet
serverPool peerSelector
lock sync.RWMutex
sentReqs map[uint64]*sentReq
}
//validatorfunc是一个处理回复消息的函数
type validatorFunc func(distPeer, *Msg) error
//PeerSelector接收有关响应时间和超时的反馈信息
type peerSelector interface {
adjustResponseTime(*poolEntry, time.Duration, bool)
}
//sentreq表示由retrievemanager发送和跟踪的请求
type sentReq struct {
rm *retrieveManager
req *distReq
id uint64
validate validatorFunc
eventsCh chan reqPeerEvent
stopCh chan struct{}
stopped bool
err error
lock sync.RWMutex //保护对Sento地图的访问
sentTo map[distPeer]sentReqToPeer
lastReqQueued bool //上一个请求已排队,但未发送
lastReqSentTo distPeer //如果不是零,则最后一个请求已发送给给定的对等方,但未超时。
reqSrtoCount int //达到软(但非硬)超时的请求数
}
//SentReqTopeer通知来自对等Goroutine(TryRequest)的请求有关响应
//由给定的对等方传递。每个对等端的每个请求只允许一次传递,
//在此之后,delivered设置为true,响应的有效性将发送到
//有效通道,不接受其他响应。
type sentReqToPeer struct {
delivered bool
valid chan bool
}
//ReqPeerEvent由Peer Goroutine(TryRequest)的请求发送到
//通过eventsch通道请求状态机(retrieveloop)。
type reqPeerEvent struct {
event int
peer distPeer
}
const (
rpSent = iota //如果peer==nil,则不发送(没有合适的peer)
rpSoftTimeout
rpHardTimeout
rpDeliveredValid
rpDeliveredInvalid
)
//newRetrieveManager创建检索管理器
func newRetrieveManager(peers *peerSet, dist *requestDistributor, serverPool peerSelector) *retrieveManager {
return &retrieveManager{
peers: peers,
dist: dist,
serverPool: serverPool,
sentReqs: make(map[uint64]*sentReq),
}
}
//检索发送请求(必要时发送给多个对等方)并等待响应
//通过传递函数传递并由
//验证程序回调。当传递有效答案或上下文为
//取消。
func (rm *retrieveManager) retrieve(ctx context.Context, reqID uint64, req *distReq, val validatorFunc, shutdown chan struct{}) error {
sentReq := rm.sendReq(reqID, req, val)
select {
case <-sentReq.stopCh:
case <-ctx.Done():
sentReq.stop(ctx.Err())
case <-shutdown:
sentReq.stop(fmt.Errorf("Client is shutting down"))
}
return sentReq.getError()
}
//sendreq启动一个进程,该进程不断尝试为
//在停止或成功之前,从任何合适的对等方请求。
func (rm *retrieveManager) sendReq(reqID uint64, req *distReq, val validatorFunc) *sentReq {
r := &sentReq{
rm: rm,
req: req,
id: reqID,
sentTo: make(map[distPeer]sentReqToPeer),
stopCh: make(chan struct{}),
eventsCh: make(chan reqPeerEvent, 10),
validate: val,
}
canSend := req.canSend
req.canSend = func(p distPeer) bool {
//为cansend添加额外的检查:请求以前没有发送到同一个对等机
r.lock.RLock()
_, sent := r.sentTo[p]
r.lock.RUnlock()
return !sent && canSend(p)
}
request := req.request
req.request = func(p distPeer) func() {
//在实际发送请求之前,在sentto映射中输入一个条目
r.lock.Lock()
r.sentTo[p] = sentReqToPeer{false, make(chan bool, 1)}
r.lock.Unlock()
return request(p)
}
rm.lock.Lock()
rm.sentReqs[reqID] = r
rm.lock.Unlock()
go r.retrieveLoop()
return r
}
//LES协议管理器调用Deliver将回复消息传递给等待的请求。
func (rm *retrieveManager) deliver(peer distPeer, msg *Msg) error {
rm.lock.RLock()
req, ok := rm.sentReqs[msg.ReqID]
rm.lock.RUnlock()
if ok {
return req.deliver(peer, msg)
}
return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID)
}
//reqstatefn表示检索循环状态机的状态
type reqStateFn func() reqStateFn
//RetrieveLoop是检索状态机事件循环
func (r *sentReq) retrieveLoop() {
go r.tryRequest()
r.lastReqQueued = true
state := r.stateRequesting
for state != nil {
state = state()
}
r.rm.lock.Lock()
delete(r.rm.sentReqs, r.id)
r.rm.lock.Unlock()
}
//状态请求:请求最近已排队或发送;当达到软超时时,
//将向新对等发送新请求
func (r *sentReq) stateRequesting() reqStateFn {
select {
case ev := <-r.eventsCh:
r.update(ev)
switch ev.event {
case rpSent:
if ev.peer == nil {
//请求发送失败,没有合适的对等机
if r.waiting() {
//我们已经在等待可能成功的已发送请求,请继续等待
return r.stateNoMorePeers
}
//无需等待,无需询问其他对等方,返回时出错
r.stop(light.ErrNoPeers)
//无需转到停止状态,因为waiting()已返回false
return nil
}
case rpSoftTimeout:
//上次请求超时,请尝试询问新的对等方
go r.tryRequest()
r.lastReqQueued = true
return r.stateRequesting
case rpDeliveredValid:
r.stop(nil)
return r.stateStopped
}
return r.stateRequesting
case <-r.stopCh:
return r.stateStopped
}
}
//StateNoMorepeers:无法发送更多请求,因为没有合适的对等机可用。
//对等方稍后可能会适合某个请求,或者可能会出现新的对等方,因此我们
//继续努力。
func (r *sentReq) stateNoMorePeers() reqStateFn {
select {
case <-time.After(retryQueue):
go r.tryRequest()
r.lastReqQueued = true
return r.stateRequesting
case ev := <-r.eventsCh:
r.update(ev)
if ev.event == rpDeliveredValid {
r.stop(nil)
return r.stateStopped
}
return r.stateNoMorePeers
case <-r.stopCh:
return r.stateStopped
}
}
//Statestopped:请求成功或取消,只是等待一些对等方
//要么回答,要么很难超时
func (r *sentReq) stateStopped() reqStateFn {
for r.waiting() {
r.update(<-r.eventsCh)
}
return nil
}
//更新根据事件更新排队/发送标志和超时对等计数器
func (r *sentReq) update(ev reqPeerEvent) {
switch ev.event {
case rpSent:
r.lastReqQueued = false
r.lastReqSentTo = ev.peer
case rpSoftTimeout:
r.lastReqSentTo = nil
r.reqSrtoCount++
case rpHardTimeout:
r.reqSrtoCount--
case rpDeliveredValid, rpDeliveredInvalid:
if ev.peer == r.lastReqSentTo {
r.lastReqSentTo = nil
} else {
r.reqSrtoCount--
}
}
}
//如果检索机制正在等待来自的答案,则waiting返回true。
//任何同龄人
func (r *sentReq) waiting() bool {
return r.lastReqQueued || r.lastReqSentTo != nil || r.reqSrtoCount > 0
}
//TryRequest尝试将请求发送到新的对等机,并等待它
//如果发送成功或超时。它还发送适当的reqpeerEvent
//发送到请求的事件通道的消息。
func (r *sentReq) tryRequest() {
sent := r.rm.dist.queue(r.req)
var p distPeer
select {
case p = <-sent:
case <-r.stopCh:
if r.rm.dist.cancel(r.req) {
p = nil
} else {
p = <-sent
}
}
r.eventsCh <- reqPeerEvent{rpSent, p}
if p == nil {
return
}
reqSent := mclock.Now()
srto, hrto := false, false
r.lock.RLock()
s, ok := r.sentTo[p]
r.lock.RUnlock()
if !ok {
panic(nil)
}
defer func() {
//向服务器池发送反馈,并在发生硬超时时删除对等机
pp, ok := p.(*peer)
if ok && r.rm.serverPool != nil {
respTime := time.Duration(mclock.Now() - reqSent)
r.rm.serverPool.adjustResponseTime(pp.poolEntry, respTime, srto)
}
if hrto {
pp.Log().Debug("Request timed out hard")
if r.rm.peers != nil {
r.rm.peers.Unregister(pp.id)
}
}
r.lock.Lock()
delete(r.sentTo, p)
r.lock.Unlock()
}()
select {
case ok := <-s.valid:
if ok {
r.eventsCh <- reqPeerEvent{rpDeliveredValid, p}
} else {
r.eventsCh <- reqPeerEvent{rpDeliveredInvalid, p}
}
return
case <-time.After(softRequestTimeout):
srto = true
r.eventsCh <- reqPeerEvent{rpSoftTimeout, p}
}
select {
case ok := <-s.valid:
if ok {
r.eventsCh <- reqPeerEvent{rpDeliveredValid, p}
} else {
r.eventsCh <- reqPeerEvent{rpDeliveredInvalid, p}
}
case <-time.After(hardRequestTimeout):
hrto = true
r.eventsCh <- reqPeerEvent{rpHardTimeout, p}
}
}
//传递属于此请求的答复
func (r *sentReq) deliver(peer distPeer, msg *Msg) error {
r.lock.Lock()
defer r.lock.Unlock()
s, ok := r.sentTo[peer]
if !ok || s.delivered {
return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID)
}
valid := r.validate(peer, msg) == nil
r.sentTo[peer] = sentReqToPeer{true, s.valid}
s.valid <- valid
if !valid {
return errResp(ErrInvalidResponse, "reqID = %v", msg.ReqID)
}
return nil
}
//停止停止检索进程并设置将返回的错误代码
//通过获得错误
func (r *sentReq) stop(err error) {
r.lock.Lock()
if !r.stopped {
r.stopped = true
r.err = err
close(r.stopCh)
}
r.lock.Unlock()
}
//GetError返回任何检索错误(由
//停止功能)停止后
func (r *sentReq) getError() error {
return r.err
}
//genreqid生成新的随机请求ID
func genReqID() uint64 {
var rnd [8]byte
rand.Read(rnd[:])
return binary.BigEndian.Uint64(rnd[:])
}