-
Notifications
You must be signed in to change notification settings - Fork 1
/
ipc_peer.go
144 lines (121 loc) · 3.29 KB
/
ipc_peer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package bdls
import (
"crypto/ecdsa"
fmt "fmt"
math "math"
rand "math/rand"
"net"
"sync"
"time"
"unsafe"
"github.com/yonggewang/bdls/timer"
)
// fake address for IPCPeer
type fakeAddress string
func (fakeAddress) Network() string { return "ipc" }
func (f fakeAddress) String() string { return string(f) }
// IPCPeer represents an in-process peer for testing, which sends messages
// directly via function call, message delivery latency can be customizable
// to emulate variety of network latency. Delay is randomized with standard
// normal distribution based on given parameters.
type IPCPeer struct {
c *Consensus
sync.Mutex
latency time.Duration
die chan struct{}
dieOnce sync.Once
msgCount int64
bytesCount int64
minLatency time.Duration
maxLatency time.Duration
totalLatency time.Duration
}
// NewIPCPeer creates IPC based peer with latency, latency is distributed with
// standard normal distribution.
func NewIPCPeer(c *Consensus, latency time.Duration) *IPCPeer {
p := new(IPCPeer)
p.c = c
p.latency = latency
p.die = make(chan struct{})
p.minLatency = math.MaxInt64
return p
}
// GetPublicKey returns peer's public key as identity
func (p *IPCPeer) GetPublicKey() *ecdsa.PublicKey { return &p.c.privateKey.PublicKey }
// RemoteAddr implements Peer.RemoteAddr, the address is p's memory address
func (p *IPCPeer) RemoteAddr() net.Addr { return fakeAddress(fmt.Sprint(unsafe.Pointer(p))) }
// GetMessageCount returns messages count this peer received
func (p *IPCPeer) GetMessageCount() int64 {
p.Lock()
defer p.Unlock()
return p.msgCount
}
// GetBytesCount returns messages bytes count this peer received
func (p *IPCPeer) GetBytesCount() int64 {
p.Lock()
defer p.Unlock()
return p.bytesCount
}
// Propose a state, awaiting to be finalized at next height.
func (p *IPCPeer) Propose(s State) {
p.Lock()
defer p.Unlock()
p.c.Propose(s)
}
// GetLatestState returns latest state
func (p *IPCPeer) GetLatestState() (height uint64, round uint64, data State) {
p.Lock()
defer p.Unlock()
return p.c.CurrentState()
}
// GetLatencies returns actual generated latency
func (p *IPCPeer) GetLatencies() (min time.Duration, max time.Duration, total time.Duration) {
p.Lock()
defer p.Unlock()
return p.minLatency, p.maxLatency, p.totalLatency
}
// Send implements Peer.Send
func (p *IPCPeer) Send(msg []byte) error {
delay := p.delay()
txDelay := func() {
p.Lock()
defer p.Unlock()
if p.minLatency > delay {
p.minLatency = delay
}
if p.maxLatency < delay {
p.maxLatency = delay
}
p.totalLatency += delay
p.msgCount++
p.bytesCount += int64(len(msg))
err := p.c.ReceiveMessage(msg, time.Now())
if err != nil {
// log.Println(err)
}
}
timer.SystemTimedSched.Put(txDelay, time.Now().Add(delay))
return nil
}
// delay is randomized with standard normal distribution
func (p *IPCPeer) delay() time.Duration {
return time.Duration(0.1*rand.NormFloat64()*float64(p.latency)) + p.latency
}
// Update will call itself perodically
func (p *IPCPeer) Update() {
p.Lock()
defer p.Unlock()
select {
case <-p.die:
default:
// call consensus update
_ = p.c.Update(time.Now())
timer.SystemTimedSched.Put(p.Update, time.Now().Add(20*time.Millisecond))
}
}
// Close this peer
func (p *IPCPeer) Close() {
p.dieOnce.Do(func() {
close(p.die)
})
}