forked from DataDog/datadog-agent
-
Notifications
You must be signed in to change notification settings - Fork 0
/
multi.go
77 lines (66 loc) · 1.83 KB
/
multi.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package writer
import (
"sync"
"github.com/ninnemana/datadog-agent/pkg/trace/writer/config"
)
var _ payloadSender = (*multiSender)(nil)
// multiSender is an implementation of payloadSender which forwards any
// received payload to multiple payloadSenders, funnelling incoming monitor
// events.
type multiSender struct {
senders []payloadSender
mwg sync.WaitGroup // monitor funnel waitgroup
mch chan monitorEvent // monitor funneling channel
}
// newMultiSender returns a new payloadSender which forwards all sent payloads to all
// the given endpoints, as well as funnels all monitoring channels.
func newMultiSender(endpoints []endpoint, cfg config.QueuablePayloadSenderConf) payloadSender {
if len(endpoints) == 1 {
return newSender(endpoints[0], cfg)
}
senders := make([]payloadSender, len(endpoints))
for i, e := range endpoints {
senders[i] = newSender(e, cfg)
}
return &multiSender{
senders: senders,
mch: make(chan monitorEvent, len(senders)),
}
}
// Start starts all senders.
func (w *multiSender) Start() {
for _, sender := range w.senders {
sender.Start()
}
for _, sender := range w.senders {
w.mwg.Add(1)
go func(ch <-chan monitorEvent) {
defer w.mwg.Done()
for event := range ch {
w.mch <- event
}
}(sender.Monitor())
}
}
// Stop stops all senders.
func (w *multiSender) Stop() {
for _, sender := range w.senders {
sender.Stop()
}
w.mwg.Wait()
close(w.mch)
}
// Send forwards the payload to all registered senders.
func (w *multiSender) Send(p *payload) {
for _, sender := range w.senders {
sender.Send(p)
}
}
func (w *multiSender) Monitor() <-chan monitorEvent { return w.mch }
// Run implements payloadSender.
func (w *multiSender) Run() { /* no-op */ }
func (w *multiSender) setEndpoint(endpoint endpoint) {
for _, sender := range w.senders {
sender.setEndpoint(endpoint)
}
}