/
peer.go
202 lines (173 loc) · 4.47 KB
/
peer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
// Copyright (c) 2018 The VeChainThor developers
// Distributed under the GNU Lesser General Public License v3.0 software license, see the accompanying
// file LICENSE or <https://www.gnu.org/licenses/lgpl-3.0.html>
package comm
import (
"math/rand"
"sync"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
lru "github.com/hashicorp/golang-lru"
"github.com/inconshreveable/log15"
"github.com/vechain/thor/p2psrv/rpc"
"github.com/vechain/thor/thor"
)
const (
maxKnownTxs = 65536 // Maximum transactions IDs to keep in the known list (prevent DOS)
maxKnownBlocks = 1024 // Maximum block IDs to keep in the known list (prevent DOS)
)
func init() {
rand.Seed(time.Now().UnixNano())
}
// Peer extends p2p.Peer with RPC integrated.
type Peer struct {
*p2p.Peer
*rpc.RPC
logger log15.Logger
createdTime mclock.AbsTime
knownTxs *lru.Cache
knownBlocks *lru.Cache
head struct {
sync.Mutex
id thor.Bytes32
totalScore uint64
}
}
func newPeer(peer *p2p.Peer, rw p2p.MsgReadWriter) *Peer {
dir := "outbound"
if peer.Inbound() {
dir = "inbound"
}
ctx := []interface{}{
"peer", peer,
"dir", dir,
}
knownTxs, _ := lru.New(maxKnownTxs)
knownBlocks, _ := lru.New(maxKnownBlocks)
return &Peer{
Peer: peer,
RPC: rpc.New(peer, rw),
logger: log.New(ctx...),
createdTime: mclock.Now(),
knownTxs: knownTxs,
knownBlocks: knownBlocks,
}
}
// Head returns head block ID and total score.
func (p *Peer) Head() (id thor.Bytes32, totalScore uint64) {
p.head.Lock()
defer p.head.Unlock()
return p.head.id, p.head.totalScore
}
// UpdateHead update ID and total score of head block.
func (p *Peer) UpdateHead(id thor.Bytes32, totalScore uint64) {
p.head.Lock()
defer p.head.Unlock()
if totalScore > p.head.totalScore {
p.head.id, p.head.totalScore = id, totalScore
}
}
// MarkTransaction marks a transaction to known.
func (p *Peer) MarkTransaction(hash thor.Bytes32) {
// that's 10~100 block intervals
expiration := mclock.AbsTime(time.Second * time.Duration(thor.BlockInterval*uint64(rand.Intn(91)+10)))
deadline := mclock.Now() + expiration
p.knownTxs.Add(hash, deadline)
}
// MarkBlock marks a block to known.
func (p *Peer) MarkBlock(id thor.Bytes32) {
p.knownBlocks.Add(id, struct{}{})
}
// IsTransactionKnown returns if the transaction is known.
func (p *Peer) IsTransactionKnown(hash thor.Bytes32) bool {
deadline, ok := p.knownTxs.Get(hash)
if !ok {
return false
}
return deadline.(mclock.AbsTime) > mclock.Now()
}
// IsBlockKnown returns if the block is known.
func (p *Peer) IsBlockKnown(id thor.Bytes32) bool {
return p.knownBlocks.Contains(id)
}
// Duration returns duration of connection.
func (p *Peer) Duration() mclock.AbsTime {
return mclock.Now() - p.createdTime
}
// Peers slice of peers
type Peers []*Peer
// Filter filter out sub set of peers that satisfies the given condition.
func (ps Peers) Filter(cond func(*Peer) bool) Peers {
ret := make(Peers, 0, len(ps))
for _, peer := range ps {
if cond(peer) {
ret = append(ret, peer)
}
}
return ret
}
// Find find one peer that satisfies the given condition.
func (ps Peers) Find(cond func(*Peer) bool) *Peer {
for _, peer := range ps {
if cond(peer) {
return peer
}
}
return nil
}
// PeerSet manages a set of peers, which mapped by NodeID.
type PeerSet struct {
m map[discover.NodeID]*Peer
lock sync.Mutex
}
// NewSet create a peer set instance.
func newPeerSet() *PeerSet {
return &PeerSet{
m: make(map[discover.NodeID]*Peer),
}
}
// Add add a new peer.
func (ps *PeerSet) Add(peer *Peer) {
ps.lock.Lock()
defer ps.lock.Unlock()
ps.m[peer.ID()] = peer
}
// Find find peer for given nodeID.
func (ps *PeerSet) Find(nodeID discover.NodeID) *Peer {
ps.lock.Lock()
defer ps.lock.Unlock()
return ps.m[nodeID]
}
// Remove removes peer for given nodeID.
func (ps *PeerSet) Remove(nodeID discover.NodeID) *Peer {
ps.lock.Lock()
defer ps.lock.Unlock()
if peer, ok := ps.m[nodeID]; ok {
delete(ps.m, nodeID)
return peer
}
return nil
}
// Slice dumps all peers into a slice.
// The dumped slice is a random permutation.
func (ps *PeerSet) Slice() Peers {
ps.lock.Lock()
defer ps.lock.Unlock()
ret := make(Peers, len(ps.m))
perm := rand.Perm(len(ps.m))
i := 0
for _, s := range ps.m {
// randomly
ret[perm[i]] = s
i++
}
return ret
}
// Len returns length of set.
func (ps *PeerSet) Len() int {
ps.lock.Lock()
defer ps.lock.Unlock()
return len(ps.m)
}