forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 9
/
output.go
98 lines (83 loc) · 2.02 KB
/
output.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package pipeline
import (
"github.com/elastic/beats/libbeat/common/atomic"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
)
// clientWorker manages output client of type outputs.Client, not supporting reconnect.
type clientWorker struct {
observer outputObserver
qu workQueue
client outputs.Client
closed atomic.Bool
}
// netClientWorker manages reconnectable output clients of type outputs.NetworkClient.
type netClientWorker struct {
observer outputObserver
qu workQueue
client outputs.NetworkClient
closed atomic.Bool
batchSize int
batchSizer func() int
}
func makeClientWorker(observer outputObserver, qu workQueue, client outputs.Client) outputWorker {
if nc, ok := client.(outputs.NetworkClient); ok {
c := &netClientWorker{observer: observer, qu: qu, client: nc}
go c.run()
return c
}
c := &clientWorker{observer: observer, qu: qu, client: client}
go c.run()
return c
}
func (w *clientWorker) Close() error {
w.closed.Store(true)
return w.client.Close()
}
func (w *clientWorker) run() {
for !w.closed.Load() {
for batch := range w.qu {
w.observer.outBatchSend(len(batch.events))
if err := w.client.Publish(batch); err != nil {
return
}
}
}
}
func (w *netClientWorker) Close() error {
w.closed.Store(true)
return w.client.Close()
}
func (w *netClientWorker) run() {
for !w.closed.Load() {
// start initial connect loop from first batch, but return
// batch to pipeline for other outputs to catch up while we're trying to connect
for batch := range w.qu {
batch.Cancelled()
if w.closed.Load() {
return
}
err := w.client.Connect()
if err != nil {
logp.Err("Failed to connect: %v", err)
continue
}
break
}
// send loop
for batch := range w.qu {
if w.closed.Load() {
if batch != nil {
batch.Cancelled()
}
return
}
err := w.client.Publish(batch)
if err != nil {
logp.Err("Failed to publish events: %v", err)
// on error return to connect loop
break
}
}
}
}