-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
require.go
160 lines (136 loc) · 4.74 KB
/
require.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package p2ptest
import (
"testing"
"time"
"github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/internal/p2p"
"github.com/tendermint/tendermint/types"
)
// RequireEmpty requires that the given channel is empty.
func RequireEmpty(t *testing.T, channels ...*p2p.Channel) {
for _, channel := range channels {
select {
case e := <-channel.In:
require.Fail(t, "unexpected message", "channel %v should be empty, got %v", channel.ID, e)
case <-time.After(10 * time.Millisecond):
}
}
}
// RequireReceive requires that the given envelope is received on the channel.
func RequireReceive(t *testing.T, channel *p2p.Channel, expect p2p.Envelope) {
timer := time.NewTimer(time.Second) // not time.After due to goroutine leaks
defer timer.Stop()
select {
case e, ok := <-channel.In:
require.True(t, ok, "channel %v is closed", channel.ID)
require.Equal(t, expect, e)
case <-channel.Done():
require.Fail(t, "channel %v is closed", channel.ID)
case <-timer.C:
require.Fail(t, "timed out waiting for message", "%v on channel %v", expect, channel.ID)
}
}
// RequireReceiveUnordered requires that the given envelopes are all received on
// the channel, ignoring order.
func RequireReceiveUnordered(t *testing.T, channel *p2p.Channel, expect []p2p.Envelope) {
timer := time.NewTimer(time.Second) // not time.After due to goroutine leaks
defer timer.Stop()
actual := []p2p.Envelope{}
for {
select {
case e, ok := <-channel.In:
require.True(t, ok, "channel %v is closed", channel.ID)
actual = append(actual, e)
if len(actual) == len(expect) {
require.ElementsMatch(t, expect, actual)
return
}
case <-channel.Done():
require.Fail(t, "channel %v is closed", channel.ID)
case <-timer.C:
require.ElementsMatch(t, expect, actual)
return
}
}
}
// RequireSend requires that the given envelope is sent on the channel.
func RequireSend(t *testing.T, channel *p2p.Channel, envelope p2p.Envelope) {
timer := time.NewTimer(time.Second) // not time.After due to goroutine leaks
defer timer.Stop()
select {
case channel.Out <- envelope:
case <-timer.C:
require.Fail(t, "timed out sending message", "%v on channel %v", envelope, channel.ID)
}
}
// RequireSendReceive requires that a given Protobuf message is sent to the
// given peer, and then that the given response is received back.
func RequireSendReceive(
t *testing.T,
channel *p2p.Channel,
peerID types.NodeID,
send proto.Message,
receive proto.Message,
) {
RequireSend(t, channel, p2p.Envelope{To: peerID, Message: send})
RequireReceive(t, channel, p2p.Envelope{From: peerID, Message: send})
}
// RequireNoUpdates requires that a PeerUpdates subscription is empty.
func RequireNoUpdates(t *testing.T, peerUpdates *p2p.PeerUpdates) {
t.Helper()
select {
case update := <-peerUpdates.Updates():
require.Fail(t, "unexpected peer updates", "got %v", update)
default:
}
}
// RequireError requires that the given peer error is submitted for a peer.
func RequireError(t *testing.T, channel *p2p.Channel, peerError p2p.PeerError) {
timer := time.NewTimer(time.Second) // not time.After due to goroutine leaks
defer timer.Stop()
select {
case channel.Error <- peerError:
case <-timer.C:
require.Fail(t, "timed out reporting error", "%v on %v", peerError, channel.ID)
}
}
// RequireUpdate requires that a PeerUpdates subscription yields the given update.
func RequireUpdate(t *testing.T, peerUpdates *p2p.PeerUpdates, expect p2p.PeerUpdate) {
timer := time.NewTimer(time.Second) // not time.After due to goroutine leaks
defer timer.Stop()
select {
case update := <-peerUpdates.Updates():
require.Equal(t, expect.NodeID, update.NodeID, "node id did not match")
require.Equal(t, expect.Status, update.Status, "statuses did not match")
case <-peerUpdates.Done():
require.Fail(t, "peer updates subscription is closed")
case <-timer.C:
require.Fail(t, "timed out waiting for peer update", "expected %v", expect)
}
}
// RequireUpdates requires that a PeerUpdates subscription yields the given updates
// in the given order.
func RequireUpdates(t *testing.T, peerUpdates *p2p.PeerUpdates, expect []p2p.PeerUpdate) {
timer := time.NewTimer(time.Second) // not time.After due to goroutine leaks
defer timer.Stop()
actual := []p2p.PeerUpdate{}
for {
select {
case update := <-peerUpdates.Updates():
actual = append(actual, update)
if len(actual) == len(expect) {
for idx := range expect {
require.Equal(t, expect[idx].NodeID, actual[idx].NodeID)
require.Equal(t, expect[idx].Status, actual[idx].Status)
}
return
}
case <-peerUpdates.Done():
require.Fail(t, "peer updates subscription is closed")
case <-timer.C:
require.Equal(t, expect, actual, "did not receive expected peer updates")
return
}
}
}