-
Notifications
You must be signed in to change notification settings - Fork 2
/
handler.go
788 lines (715 loc) · 25 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
//<developer>
// <name>linapex 曹一峰</name>
// <email>linapex@163.com</email>
// <wx>superexc</wx>
// <qqgroup>128148617</qqgroup>
// <url>https://jsq.ink</url>
// <role>pku engineer</role>
// <date>2019-03-16 12:09:38</date>
//</624342635972136960>
package eth
import (
"encoding/json"
"errors"
"fmt"
"math"
"math/big"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
"github.com/ethereum/go-ethereum/consensus/misc"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/fetcher"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
)
const (
softResponseLimit = 2 * 1024 * 1024 //返回的块、头或节点数据的目标最大大小。
estHeaderRlpSize = 500 //RLP编码块头的近似大小
//txchanSize是侦听newtxSevent的频道的大小。
//该数字是根据Tx池的大小引用的。
txChanSize = 4096
)
var (
daoChallengeTimeout = 15 * time.Second //允许节点响应DAO握手挑战的时间
)
//如果请求的协议和配置为
//不兼容(低协议版本限制和高要求)。
var errIncompatibleConfig = errors.New("incompatible configuration")
func errResp(code errCode, format string, v ...interface{}) error {
return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
}
type ProtocolManager struct {
networkID uint64
fastSync uint32 //标记是否启用快速同步(如果已经有块,则禁用)
acceptTxs uint32 //标记是否被认为是同步的(启用事务处理)
txpool txPool
blockchain *core.BlockChain
chainconfig *params.ChainConfig
maxPeers int
downloader *downloader.Downloader
fetcher *fetcher.Fetcher
peers *peerSet
SubProtocols []p2p.Protocol
eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
txsSub event.Subscription
minedBlockSub *event.TypeMuxSubscription
//获取器、同步器、TxSyncLoop的通道
newPeerCh chan *peer
txsyncCh chan *txsync
quitSync chan struct{}
noMorePeers chan struct{}
//等待组用于在下载过程中正常关闭
//加工
wg sync.WaitGroup
}
//NewProtocolManager返回新的以太坊子协议管理器。以太坊子协议管理对等端
//使用以太坊网络。
//NewProtocolManager返回新的以太坊子协议管理器。以太坊子协议管理对等端
//使用以太坊网络。
func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
//使用基本字段创建协议管理器
manager := &ProtocolManager{
networkID: networkID,
eventMux: mux,
txpool: txpool,
blockchain: blockchain,
chainconfig: config,
peers: newPeerSet(),
newPeerCh: make(chan *peer),
noMorePeers: make(chan struct{}),
txsyncCh: make(chan *txsync),
quitSync: make(chan struct{}),
}
//确定是否允许快速同步
if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
log.Warn("Blockchain not empty, fast sync disabled")
mode = downloader.FullSync
}
if mode == downloader.FastSync {
manager.fastSync = uint32(1)
}
//为我们能处理的每个实现版本启动一个子协议
manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
for i, version := range ProtocolVersions {
//如果与操作模式不兼容,则跳过协议版本
if mode == downloader.FastSync && version < eth63 {
continue
}
//兼容;初始化子协议
version := version //跑步结束
manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
Name: ProtocolName,
Version: version,
Length: ProtocolLengths[i],
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
peer := manager.newPeer(int(version), p, rw)
select {
case manager.newPeerCh <- peer:
manager.wg.Add(1)
defer manager.wg.Done()
return manager.handle(peer)
case <-manager.quitSync:
return p2p.DiscQuitting
}
},
NodeInfo: func() interface{} {
return manager.NodeInfo()
},
PeerInfo: func(id discover.NodeID) interface{} {
if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
return p.Info()
}
return nil
},
})
}
if len(manager.SubProtocols) == 0 {
return nil, errIncompatibleConfig
}
//构建不同的同步机制
manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
bi := blockchain.GenesisBlock().Header().BlockInterval
validator := func(header *types.Header) error {
return engine.VerifyHeader(blockchain, header, true,bi)
}
heighter := func() uint64 {
return blockchain.CurrentBlock().NumberU64()
}
inserter := func(blocks types.Blocks) (int, error) {
//如果快速同步正在运行,则拒绝导入奇怪的块
if atomic.LoadUint32(&manager.fastSync) == 1 {
log.Warn("Discarded bad propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
return 0, nil
}
atomic.StoreUint32(&manager.acceptTxs, 1) //在任何获取器导入上标记初始同步完成
return manager.blockchain.InsertChain(blocks)
}
manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
return manager, nil
}
func (pm *ProtocolManager) removePeer(id string) {
//如果对等点已被删除,则短路
peer := pm.peers.Peer(id)
if peer == nil {
return
}
log.Debug("Removing Ethereum peer", "peer", id)
//从下载程序和以太坊对等集注销对等。
pm.downloader.UnregisterPeer(id)
if err := pm.peers.Unregister(id); err != nil {
log.Error("Peer removal failed", "peer", id, "err", err)
}
//网络层硬断开
if peer != nil {
peer.Peer.Disconnect(p2p.DiscUselessPeer)
}
}
func (pm *ProtocolManager) Start(maxPeers int) {
pm.maxPeers = maxPeers
//广播事务
pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
go pm.txBroadcastLoop()
//广播挖掘块
pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
go pm.minedBroadcastLoop()
//启动同步处理程序
go pm.syncer()
go pm.txsyncLoop()
}
func (pm *ProtocolManager) Stop() {
log.Info("Stopping Ethereum protocol")
pm.txsSub.Unsubscribe() //退出TxBroadcastLoop
pm.minedBlockSub.Unsubscribe() //退出BlockBroadcastLoop
//退出同步循环。
//完成此发送后,将不接受任何新对等方。
pm.noMorePeers <- struct{}{}
//退出获取程序,TxSyncLoop。
close(pm.quitSync)
//断开现有会话。
//这也会关闭对等集上任何新注册的入口。
//已建立但尚未添加到pm.peers的会话
//当他们尝试注册时将退出。
pm.peers.Close()
//等待所有对等处理程序Goroutines和循环出现。
pm.wg.Wait()
log.Info("Ethereum protocol stopped")
}
func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
return newPeer(pv, p, newMeteredMsgWriter(rw))
}
//handle是为管理ETH对等机的生命周期而调用的回调。什么时候?
//此函数终止,对等端断开连接。
func (pm *ProtocolManager) handle(p *peer) error {
//如果这是受信任的对等,则忽略MaxPeers
if pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted {
return p2p.DiscTooManyPeers
}
p.Log().Debug("Ethereum peer connected", "name", p.Name())
//执行以太坊握手
var (
genesis = pm.blockchain.Genesis()
head = pm.blockchain.CurrentHeader()
hash = head.Hash()
number = head.Number.Uint64()
td = pm.blockchain.GetTd(hash, number)
)
if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {
p.Log().Debug("Ethereum handshake failed", "err", err)
return err
}
if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
rw.Init(p.version)
}
//在本地注册对等机
if err := pm.peers.Register(p); err != nil {
p.Log().Error("Ethereum peer registration failed", "err", err)
return err
}
defer pm.removePeer(p.id)
//在下载程序中注册对等点。如果下载者认为它是被禁止的,我们会断开
if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
return err
}
//传播现有事务。出现新交易
//之后将通过广播发送。
pm.syncTransactions(p)
//如果我们知道DAO Hard Fork,请验证与Hard Fork相关的任何远程对等。
if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
//请求对等端的DAO分叉头进行额外的数据验证
if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
return err
}
//如果对等端没有及时回复,启动计时器断开连接
p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
p.Log().Debug("Timed out DAO fork-check, dropping")
pm.removePeer(p.id)
})
//如果同伴死了,一定要把它清理干净。
defer func() {
if p.forkDrop != nil {
p.forkDrop.Stop()
p.forkDrop = nil
}
}()
}
//主回路。处理传入消息。
for {
if err := pm.handleMsg(p); err != nil {
p.Log().Debug("Ethereum message handling failed", "err", err)
return err
}
}
}
//每当从远程服务器接收到入站消息时,将调用handlemsg。
//同龄人。返回任何错误时,远程连接被断开。
func (pm *ProtocolManager) handleMsg(p *peer) error {
//从远程对等端读取下一条消息,并确保它被完全消耗掉。
msg, err := p.rw.ReadMsg()
if err != nil {
return err
}
if msg.Size > ProtocolMaxMsgSize {
return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
}
defer msg.Discard()
//根据消息的内容处理消息
switch {
case msg.Code == StatusMsg:
//状态消息不应在握手后到达
return errResp(ErrExtraStatusMsg, "uncontrolled status message")
//阻止头查询,收集请求的头并答复
case msg.Code == GetBlockHeadersMsg:
//解码复杂的头查询
var query getBlockHeadersData
if err := msg.Decode(&query); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
hashMode := query.Origin.Hash != (common.Hash{})
first := true
maxNonCanonical := uint64(100)
//收集邮件头,直到达到提取或网络限制
var (
bytes common.StorageSize
headers []*types.Header
unknown bool
)
for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch {
//检索满足查询的下一个标题
var origin *types.Header
if hashMode {
if first {
first = false
origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
if origin != nil {
query.Origin.Number = origin.Number.Uint64()
}
} else {
origin = pm.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number)
}
} else {
origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
}
if origin == nil {
break
}
headers = append(headers, origin)
bytes += estHeaderRlpSize
//前进到查询的下一个标题
switch {
case hashMode && query.Reverse:
//向Genesis块的基于哈希的遍历
ancestor := query.Skip + 1
if ancestor == 0 {
unknown = true
} else {
query.Origin.Hash, query.Origin.Number = pm.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical)
unknown = (query.Origin.Hash == common.Hash{})
}
case hashMode && !query.Reverse:
//基于哈希的叶块遍历
var (
current = origin.Number.Uint64()
next = current + query.Skip + 1
)
if next <= current {
infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ")
p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)
unknown = true
} else {
if header := pm.blockchain.GetHeaderByNumber(next); header != nil {
nextHash := header.Hash()
expOldHash, _ := pm.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical)
if expOldHash == query.Origin.Hash {
query.Origin.Hash, query.Origin.Number = nextHash, next
} else {
unknown = true
}
} else {
unknown = true
}
}
case query.Reverse:
//朝向Genesis区块的基于数字的遍历
if query.Origin.Number >= query.Skip+1 {
query.Origin.Number -= query.Skip + 1
} else {
unknown = true
}
case !query.Reverse:
//面向叶块的基于数字的遍历
query.Origin.Number += query.Skip + 1
}
}
return p.SendBlockHeaders(headers)
case msg.Code == BlockHeadersMsg:
//我们以前的一个请求收到了一批头文件
var headers []*types.Header
if err := msg.Decode(&headers); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
//如果没有收到报头,但我们要花费一张刀叉支票,也许是这样
if len(headers) == 0 && p.forkDrop != nil {
//可能是对分叉头检查的空回复,健全性检查TDS
verifyDAO := true
//如果我们已经有了一个DAO头,我们可以对照它检查对等端的TD。如果
//同行在前面,它也必须对DAO检查有答复
if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
verifyDAO = false
}
}
//如果我们看起来是在同一条链上,请禁用掉计时。
if verifyDAO {
p.Log().Debug("Seems to be on the same side of the DAO fork")
p.forkDrop.Stop()
p.forkDrop = nil
return nil
}
}
//过滤掉任何显式请求的头,将其余的传递给下载程序
filter := len(headers) == 1
if filter {
//如果这是一个潜在的DAO分叉检查,请根据规则验证
if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {
//禁用拨叉下降计时器
p.forkDrop.Stop()
p.forkDrop = nil
//验证头并删除对等端或继续
if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {
p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")
return err
}
p.Log().Debug("Verified to be on the same side of the DAO fork")
return nil
}
//与fork检查无关,将header发送到gether,以防万一。
headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
}
if len(headers) > 0 || !filter {
err := pm.downloader.DeliverHeaders(p.id, headers)
if err != nil {
log.Debug("Failed to deliver headers", "err", err)
}
}
case msg.Code == GetBlockBodiesMsg:
//解码检索消息
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
if _, err := msgStream.List(); err != nil {
return err
}
//收集块,直到达到获取或网络限制
var (
hash common.Hash
bytes int
bodies []rlp.RawValue
)
for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch {
//检索下一个块的哈希
if err := msgStream.Decode(&hash); err == rlp.EOL {
break
} else if err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
//检索请求的块体,如果找到足够的块体,则停止
if data := pm.blockchain.GetBodyRLP(hash); len(data) != 0 {
bodies = append(bodies, data)
bytes += len(data)
}
}
return p.SendBlockBodiesRLP(bodies)
case msg.Code == BlockBodiesMsg:
//我们以前的一个要求得到了一批尸体。
var request blockBodiesData
if err := msg.Decode(&request); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
//把它们全部送到下载者那里排队
transactions := make([][]*types.Transaction, len(request))
uncles := make([][]*types.Header, len(request))
for i, body := range request {
transactions[i] = body.Transactions
uncles[i] = body.Uncles
}
//过滤掉任何明确要求的主体,把其余的交付给下载者。
filter := len(transactions) > 0 || len(uncles) > 0
if filter {
transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now())
}
if len(transactions) > 0 || len(uncles) > 0 || !filter {
err := pm.downloader.DeliverBodies(p.id, transactions, uncles)
if err != nil {
log.Debug("Failed to deliver bodies", "err", err)
}
}
case p.version >= eth63 && msg.Code == GetNodeDataMsg:
//解码检索消息
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
if _, err := msgStream.List(); err != nil {
return err
}
//收集状态数据,直到达到提取或网络限制
var (
hash common.Hash
bytes int
data [][]byte
)
for bytes < softResponseLimit && len(data) < downloader.MaxStateFetch {
//检索下一个状态项的哈希
if err := msgStream.Decode(&hash); err == rlp.EOL {
break
} else if err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
//检索请求的状态项,如果找到足够的,则停止
if entry, err := pm.blockchain.TrieNode(hash); err == nil {
data = append(data, entry)
bytes += len(entry)
}
}
return p.SendNodeData(data)
case p.version >= eth63 && msg.Code == NodeDataMsg:
//一批节点状态数据到达了我们以前的一个请求
var data [][]byte
if err := msg.Decode(&data); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
//将所有内容发送到下载程序
if err := pm.downloader.DeliverNodeData(p.id, data); err != nil {
log.Debug("Failed to deliver node state data", "err", err)
}
case p.version >= eth63 && msg.Code == GetReceiptsMsg:
//解码检索消息
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
if _, err := msgStream.List(); err != nil {
return err
}
//收集状态数据,直到达到提取或网络限制
var (
hash common.Hash
bytes int
receipts []rlp.RawValue
)
for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptFetch {
//检索下一个块的哈希
if err := msgStream.Decode(&hash); err == rlp.EOL {
break
} else if err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
//检索请求块的收据,如果我们不知道,则跳过
results := pm.blockchain.GetReceiptsByHash(hash)
if results == nil {
if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
continue
}
}
//如果已知,则对响应数据包进行编码和排队
if encoded, err := rlp.EncodeToBytes(results); err != nil {
log.Error("Failed to encode receipt", "err", err)
} else {
receipts = append(receipts, encoded)
bytes += len(encoded)
}
}
return p.SendReceiptsRLP(receipts)
case p.version >= eth63 && msg.Code == ReceiptsMsg:
//我们以前的一个请求收到了一批收据
var receipts [][]*types.Receipt
if err := msg.Decode(&receipts); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
//将所有内容发送到下载程序
if err := pm.downloader.DeliverReceipts(p.id, receipts); err != nil {
log.Debug("Failed to deliver receipts", "err", err)
}
case msg.Code == NewBlockHashesMsg:
var announces newBlockHashesData
if err := msg.Decode(&announces); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
//将哈希标记为在远程节点上存在
for _, block := range announces {
p.MarkBlock(block.Hash)
}
//计划所有未知哈希以便检索
unknown := make(newBlockHashesData, 0, len(announces))
for _, block := range announces {
if !pm.blockchain.HasBlock(block.Hash, block.Number) {
unknown = append(unknown, block)
}
}
for _, block := range unknown {
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
}
case msg.Code == NewBlockMsg:
//检索并解码传播的块
var request newBlockData
if err := msg.Decode(&request); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
request.Block.ReceivedAt = msg.ReceivedAt
request.Block.ReceivedFrom = p
//将对等机标记为拥有该块并计划导入
p.MarkBlock(request.Block.Hash())
pm.fetcher.Enqueue(p.id, request.Block)
//假设该块可由对等机导入,但可能尚未导入,
//计算对等端真正必须拥有的头散列和td。
var (
trueHead = request.Block.ParentHash()
trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty())
)
//如果优于前一个,则更新同龄人的总难度
if _, td := p.Head(); trueTD.Cmp(td) > 0 {
p.SetHead(trueHead, trueTD)
//如果高于我们的,则安排同步。注意,对于间隔为
//一个单独的块(因为真正的td低于传播的块),但是
//场景应该很容易被获取器覆盖。
currentBlock := pm.blockchain.CurrentBlock()
if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {
go pm.synchronise(p)
}
}
case msg.Code == TxMsg:
//交易已到达,请确保我们有一个有效且新鲜的链来处理它们。
if atomic.LoadUint32(&pm.acceptTxs) == 0 {
break
}
//可以处理事务,解析所有事务并将其传递到池
var txs []*types.Transaction
if err := msg.Decode(&txs); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
for i, tx := range txs {
//验证并标记远程事务
if tx == nil {
return errResp(ErrDecode, "transaction %d is nil", i)
}
p.MarkTransaction(tx.Hash())
}
pm.txpool.AddRemotes(txs)
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
}
return nil
}
//BroadcastBlock将把一个块传播到它的一个子集的对等端,或者
//只会宣布它的可用性(取决于请求的内容)。
func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
hash := block.Hash()
peers := pm.peers.PeersWithoutBlock(hash)
//如果请求传播,则发送到对等机的一个子集
if propagate {
//计算块的td(尚未导入,因此块.td无效)
var td *big.Int
if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))
} else {
log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
return
}
//将块发送到对等节点的一个子集
transfer := peers[:int(math.Sqrt(float64(len(peers))))]
for _, peer := range transfer {
peer.AsyncSendNewBlock(block, td)
}
log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
return
}
//否则,如果该块确实在外链中,则将其公布。
if pm.blockchain.HasBlock(hash, block.NumberU64()) {
for _, peer := range peers {
peer.AsyncSendNewBlockHash(block)
}
log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
}
}
//BroadcastTXS将把一批事务传播到所有未知的对等端
//已经有给定的事务。
func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
var txset = make(map[*peer]types.Transactions)
//将事务广播给一批不知道它的对等方
for _, tx := range txs {
peers := pm.peers.PeersWithoutTx(tx.Hash())
for _, peer := range peers {
txset[peer] = append(txset[peer], tx)
}
log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers))
}
//fixme再次包括:peers=peers[:int(math.sqrt(float64(len(peers)))]
for peer, txs := range txset {
peer.AsyncSendTransactions(txs)
}
}
//地雷广播回路
func (pm *ProtocolManager) minedBroadcastLoop() {
//如果取消订阅,则自动停止
for obj := range pm.minedBlockSub.Chan() {
if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
pm.BroadcastBlock(ev.Block, true) //首先将块传播到对等端
pm.BroadcastBlock(ev.Block, false) //然后向其他人宣布
}
}
}
func (pm *ProtocolManager) txBroadcastLoop() {
for {
select {
case event := <-pm.txsCh:
pm.BroadcastTxs(event.Txs)
//err()取消订阅时通道将关闭。
case <-pm.txsSub.Err():
return
}
}
}
//nodeinfo表示以太坊子协议元数据的简短摘要
//了解主机对等机。
type NodeInfo struct {
Network uint64 `json:"network"` //以太坊网络ID(1=前沿,2=现代,Ropsten=3,Rinkeby=4)
Difficulty *big.Int `json:"difficulty"` //主机区块链的总难度
Genesis common.Hash `json:"genesis"` //寄主创世纪区块的沙3哈希
Config *params.ChainConfig `json:"config"` //分叉规则的链配置
Head common.Hash `json:"head"` //主机最好拥有的块的sha3哈希
}
//nodeinfo检索有关正在运行的主机节点的一些协议元数据。
func (pm *ProtocolManager) NodeInfo() *NodeInfo {
currentBlock := pm.blockchain.CurrentBlock()
return &NodeInfo{
Network: pm.networkID,
Difficulty: pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()),
Genesis: pm.blockchain.Genesis().Hash(),
Config: pm.blockchain.Config(),
Head: currentBlock.Hash(),
}
}