forked from mroth/sseserver
-
Notifications
You must be signed in to change notification settings - Fork 0
/
hub_test.go
133 lines (113 loc) · 3.2 KB
/
hub_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package sseserver
import (
"testing"
"time"
)
func mockHub(initialConnections int) (h *hub) {
h = newHub()
go h.run()
for i := 0; i < initialConnections; i++ {
h.register <- mockConn("/test")
}
return h
}
func mockConn(namespace string) *connection {
return &connection{
send: make(chan []byte, 256),
created: time.Now(),
namespace: namespace,
}
}
type deliveryCase struct {
conn *connection
expected int
}
func TestBroadcastSingleplex(t *testing.T) {
h := mockHub(0)
c1 := mockConn("/foo")
c2 := mockConn("/bar")
h.register <- c1
h.register <- c2
//broadcast to foo channel
h.broadcast <- SSEMessage{"", []byte("yo"), "/foo"}
//check for proper delivery
d := []deliveryCase{
deliveryCase{c1, 1},
deliveryCase{c2, 0},
}
for _, c := range d {
if actual := len(c.conn.send); actual != c.expected {
t.Fatalf("Expected conn to have %d message in queue, actual: %d", c.expected, actual)
}
}
}
func TestBroadcastMultiplex(t *testing.T) {
h := mockHub(0)
c1 := mockConn("/foo")
c2 := mockConn("/foo")
c3 := mockConn("/burrito")
h.register <- c1
h.register <- c2
h.register <- c3
//broadcast to channels
h.broadcast <- SSEMessage{"", []byte("yo"), "/foo"}
h.broadcast <- SSEMessage{"", []byte("yo"), "/foo"}
h.broadcast <- SSEMessage{"", []byte("yo"), "/bar"}
//check for proper delivery
d := []deliveryCase{
deliveryCase{c1, 2},
deliveryCase{c2, 2},
deliveryCase{c3, 0},
}
for _, c := range d {
if actual := len(c.conn.send); actual != c.expected {
t.Fatalf("Expected conn to have %d message in queue, actual: %d", c.expected, actual)
}
}
}
func TestBroadcastWildcards(t *testing.T) {
h := mockHub(0)
cDogs := mockConn("/pets/dogs")
cCats := mockConn("/pets/cats")
cWild := mockConn("/pets")
cOther := mockConn("/kids")
h.register <- cDogs
h.register <- cCats
h.register <- cWild
h.register <- cOther
//broadcast to channels
h.broadcast <- SSEMessage{"", []byte("woof"), "/pets/dogs"}
h.broadcast <- SSEMessage{"", []byte("meow"), "/pets/cats"}
h.broadcast <- SSEMessage{"", []byte("wahh"), "/kids"}
//check for proper delivery
d := []deliveryCase{
deliveryCase{cDogs, 1},
deliveryCase{cCats, 1},
deliveryCase{cWild, 2},
deliveryCase{cOther, 1},
}
for _, c := range d {
if actual := len(c.conn.send); actual != c.expected {
t.Fatalf("Expected conn to have %d message in queue, actual: %d", c.expected, actual)
}
}
}
func benchmarkBroadcast(conns int, b *testing.B) {
h := mockHub(conns)
for n := 0; n < b.N; n++ {
h.broadcast <- SSEMessage{"", []byte("foo bar woo"), "/test"}
h.broadcast <- SSEMessage{"event-foo", []byte("foo bar woo"), "/test"}
// mock reading the connections
// in theory this happens concurrently on another goroutine but here we will
// exhaust the buffer quick if we dont force the read
for c := range h.connections {
<-c.send
<-c.send
}
}
}
func BenchmarkBroadcast1(b *testing.B) { benchmarkBroadcast(1, b) }
func BenchmarkBroadcast10(b *testing.B) { benchmarkBroadcast(10, b) }
func BenchmarkBroadcast100(b *testing.B) { benchmarkBroadcast(100, b) }
func BenchmarkBroadcast500(b *testing.B) { benchmarkBroadcast(500, b) }
func BenchmarkBroadcast1000(b *testing.B) { benchmarkBroadcast(1000, b) }