forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
bulk.go
140 lines (121 loc) · 2.93 KB
/
bulk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package publisher
import (
"time"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/outputs"
)
type bulkWorker struct {
output worker
ws *workerSignal
queue chan message
bulkQueue chan message
guaranteed bool
flushTicker *time.Ticker
maxBatchSize int
events []common.MapStr // batched events
pending []outputs.Signaler // pending signalers for batched events
}
func newBulkWorker(
ws *workerSignal, hwm int, bulkHWM int,
output worker,
flushInterval time.Duration,
maxBatchSize int,
) *bulkWorker {
b := &bulkWorker{
output: output,
ws: ws,
queue: make(chan message, hwm),
bulkQueue: make(chan message, bulkHWM),
flushTicker: time.NewTicker(flushInterval),
maxBatchSize: maxBatchSize,
events: make([]common.MapStr, 0, maxBatchSize),
pending: nil,
}
ws.wg.Add(1)
go b.run()
return b
}
func (b *bulkWorker) send(m message) {
if m.events == nil {
b.queue <- m
} else {
b.bulkQueue <- m
}
}
func (b *bulkWorker) run() {
defer b.shutdown()
for {
select {
case <-b.ws.done:
return
case m := <-b.queue:
b.onEvent(&m.context, m.event)
case m := <-b.bulkQueue:
b.onEvents(&m.context, m.events)
case <-b.flushTicker.C:
if len(b.events) > 0 {
b.publish()
}
}
}
}
func (b *bulkWorker) onEvent(ctx *Context, event common.MapStr) {
b.events = append(b.events, event)
b.guaranteed = b.guaranteed || ctx.Guaranteed
signal := ctx.Signal
if signal != nil {
b.pending = append(b.pending, signal)
}
if len(b.events) == cap(b.events) {
b.publish()
}
}
func (b *bulkWorker) onEvents(ctx *Context, events []common.MapStr) {
for len(events) > 0 {
// split up bulk to match required bulk sizes.
// If input events have been split up bufferFull will be set and
// bulk request will be published.
spaceLeft := cap(b.events) - len(b.events)
consume := len(events)
bufferFull := spaceLeft <= consume
signal := ctx.Signal
b.guaranteed = b.guaranteed || ctx.Guaranteed
if spaceLeft < consume {
consume = spaceLeft
if signal != nil {
// creating cascading signaler chain for
// subset of events being send
signal = outputs.NewSplitSignaler(signal, 2)
}
}
// buffer events
b.events = append(b.events, events[:consume]...)
events = events[consume:]
if signal != nil {
b.pending = append(b.pending, signal)
}
if bufferFull {
b.publish()
}
}
}
func (b *bulkWorker) publish() {
// TODO: remember/merge and forward context options to output worker
b.output.send(message{
context: Context{
publishOptions: publishOptions{Guaranteed: b.guaranteed},
Signal: outputs.NewCompositeSignaler(b.pending...),
},
event: nil,
events: b.events,
})
b.pending = nil
b.guaranteed = false
b.events = make([]common.MapStr, 0, b.maxBatchSize)
}
func (b *bulkWorker) shutdown() {
b.flushTicker.Stop()
stopQueue(b.queue)
stopQueue(b.bulkQueue)
b.ws.wg.Done()
}