-
Notifications
You must be signed in to change notification settings - Fork 77
/
broadcaster.go
80 lines (72 loc) · 1.57 KB
/
broadcaster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package rpcbroadcaster
import (
"time"
"go.uber.org/zap"
)
// RPCBroadcaster represents a generic RPC broadcaster.
type RPCBroadcaster struct {
Clients map[string]*RPCClient
Log *zap.Logger
Responses chan []interface{}
close chan struct{}
finished chan struct{}
sendTimeout time.Duration
}
// NewRPCBroadcaster returns a new RPC broadcaster instance.
func NewRPCBroadcaster(log *zap.Logger, sendTimeout time.Duration) *RPCBroadcaster {
return &RPCBroadcaster{
Clients: make(map[string]*RPCClient),
Log: log,
close: make(chan struct{}),
finished: make(chan struct{}),
Responses: make(chan []interface{}),
sendTimeout: sendTimeout,
}
}
// Run implements oracle.Broadcaster.
func (r *RPCBroadcaster) Run() {
for _, c := range r.Clients {
go c.run()
}
run:
for {
select {
case <-r.close:
break run
case ps := <-r.Responses:
for _, c := range r.Clients {
select {
case c.responses <- ps:
default:
c.log.Error("can't send response, channel is full")
}
}
}
}
for _, c := range r.Clients {
<-c.finished
}
drain:
for {
select {
case <-r.Responses:
default:
break drain
}
}
close(r.Responses)
close(r.finished)
}
// SendParams sends a request using all clients if the broadcaster is active.
func (r *RPCBroadcaster) SendParams(params []interface{}) {
select {
case <-r.close:
case r.Responses <- params:
}
}
// Shutdown implements oracle.Broadcaster. The same instance can't be Run again
// after the shutdown.
func (r *RPCBroadcaster) Shutdown() {
close(r.close)
<-r.finished
}