forked from premendrasingh/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bulk.go
138 lines (118 loc) · 2.78 KB
/
bulk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package publisher
import (
"time"
"github.com/elastic/beats/libbeat/common/op"
"github.com/elastic/beats/libbeat/outputs"
)
type bulkWorker struct {
output worker
ws *workerSignal
queue chan message
bulkQueue chan message
guaranteed bool
flushTicker *time.Ticker
maxBatchSize int
data []outputs.Data // batched events
pending []op.Signaler // pending signalers for batched events
}
func newBulkWorker(
ws *workerSignal, hwm int, bulkHWM int,
output worker,
flushInterval time.Duration,
maxBatchSize int,
) *bulkWorker {
b := &bulkWorker{
output: output,
ws: ws,
queue: make(chan message, hwm),
bulkQueue: make(chan message, bulkHWM),
flushTicker: time.NewTicker(flushInterval),
maxBatchSize: maxBatchSize,
data: make([]outputs.Data, 0, maxBatchSize),
pending: nil,
}
b.ws.wg.Add(1)
go b.run()
return b
}
func (b *bulkWorker) send(m message) {
send(b.queue, b.bulkQueue, m)
}
func (b *bulkWorker) run() {
defer b.shutdown()
for {
select {
case <-b.ws.done:
return
case m := <-b.queue:
b.onEvent(&m.context, m.datum)
case m := <-b.bulkQueue:
b.onEvents(&m.context, m.data)
case <-b.flushTicker.C:
b.flush()
}
}
}
func (b *bulkWorker) flush() {
if len(b.data) > 0 {
b.publish()
}
}
func (b *bulkWorker) onEvent(ctx *Context, data outputs.Data) {
b.data = append(b.data, data)
b.guaranteed = b.guaranteed || ctx.Guaranteed
signal := ctx.Signal
if signal != nil {
b.pending = append(b.pending, signal)
}
if len(b.data) == cap(b.data) {
b.publish()
}
}
func (b *bulkWorker) onEvents(ctx *Context, data []outputs.Data) {
for len(data) > 0 {
// split up bulk to match required bulk sizes.
// If input events have been split up bufferFull will be set and
// bulk request will be published.
spaceLeft := cap(b.data) - len(b.data)
consume := len(data)
bufferFull := spaceLeft <= consume
signal := ctx.Signal
b.guaranteed = b.guaranteed || ctx.Guaranteed
if spaceLeft < consume {
consume = spaceLeft
if signal != nil {
// creating cascading signaler chain for
// subset of events being send
signal = op.SplitSignaler(signal, 2)
}
}
// buffer events
b.data = append(b.data, data[:consume]...)
data = data[consume:]
if signal != nil {
b.pending = append(b.pending, signal)
}
if bufferFull {
b.publish()
}
}
}
func (b *bulkWorker) publish() {
b.output.send(message{
context: Context{
publishOptions: publishOptions{Guaranteed: b.guaranteed},
Signal: op.CombineSignalers(b.pending...),
},
data: b.data,
})
b.pending = nil
b.guaranteed = false
b.data = make([]outputs.Data, 0, b.maxBatchSize)
}
func (b *bulkWorker) shutdown() {
b.flushTicker.Stop()
stopQueue(b.queue)
stopQueue(b.bulkQueue)
b.ws.wg.Done()
}