-
Notifications
You must be signed in to change notification settings - Fork 0
/
sync.go
112 lines (103 loc) · 2.67 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package gossip
import (
"math/rand"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p/enode"
)
type txsync struct {
p *peer
txids []common.Hash
}
// syncTransactions starts sending all currently pending transactions to the given peer.
func (pm *ProtocolManager) syncTransactions(p *peer, txids []common.Hash) {
if len(txids) == 0 {
return
}
select {
case pm.txsyncCh <- &txsync{p, txids}:
case <-pm.quitSync:
}
}
// txsyncLoop takes care of the initial transaction sync for each new
// connection. When a new peer appears, we relay all currently pending
// transactions. In order to minimise egress bandwidth usage, we send
// the transactions in small packs to one peer at a time.
func (pm *ProtocolManager) txsyncLoop() {
var (
pending = make(map[enode.ID]*txsync)
sending = false // whether a send is active
pack = new(txsync) // the pack that is being sent
done = make(chan error, 1) // result of the send
)
// send starts a sending a pack of transactions from the sync.
send := func(s *txsync) {
// Fill pack with transactions up to the target size.
pack.p = s.p
pack.txids = pack.txids[:0]
for i := 0; i < len(s.txids) && len(pack.txids) < softLimitItems; i++ {
pack.txids = append(pack.txids, s.txids[i])
}
// Remove the transactions that will be sent.
s.txids = s.txids[len(pack.txids):]
if len(s.txids) == 0 {
delete(pending, s.p.ID())
}
// Send the pack in the background.
s.p.Log().Trace("Sending batch of transaction hashes", "count", len(pack.txids))
sending = true
go func() {
if len(pack.txids) != 0 {
done <- pack.p.SendTransactionHashes(pack.txids)
} else {
done <- nil
}
}()
}
// pick chooses the next pending sync.
pick := func() *txsync {
if len(pending) == 0 {
return nil
}
n := rand.Intn(len(pending)) + 1
for _, s := range pending {
if n--; n == 0 {
return s
}
}
return nil
}
for {
select {
case s := <-pm.txsyncCh:
pending[s.p.ID()] = s
if !sending {
send(s)
}
case err := <-done:
sending = false
// Stop tracking peers that cause send failures.
if err != nil {
pack.p.Log().Debug("Transaction send failed", "err", err)
delete(pending, pack.p.ID())
}
// Schedule the next send.
if s := pick(); s != nil {
send(s)
}
case <-pm.quitSync:
return
}
}
}
// syncer is responsible for periodically synchronising with the network, both
// downloading hashes and events as well as handling the announcement handler.
func (pm *ProtocolManager) syncer() {
// Start and ensure cleanup of sync mechanisms
for {
select {
case <-pm.newPeerCh:
case <-pm.noMorePeers:
return
}
}
}