forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
batch.go
90 lines (73 loc) · 1.48 KB
/
batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package pipeline
import (
"sync"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/publisher/queue"
)
type Batch struct {
original queue.Batch
ctx *batchContext
ttl int
events []publisher.Event
}
type batchContext struct {
observer *observer
retryer *retryer
}
var batchPool = sync.Pool{
New: func() interface{} {
return &Batch{}
},
}
func newBatch(ctx *batchContext, original queue.Batch, ttl int) *Batch {
if original == nil {
panic("empty batch")
}
b := batchPool.Get().(*Batch)
*b = Batch{
original: original,
ctx: ctx,
ttl: ttl,
events: original.Events(),
}
return b
}
func releaseBatch(b *Batch) {
*b = Batch{} // clear batch
batchPool.Put(b)
}
func (b *Batch) Events() []publisher.Event {
return b.events
}
func (b *Batch) ACK() {
b.ctx.observer.outBatchACKed(len(b.events))
b.original.ACK()
releaseBatch(b)
}
func (b *Batch) Drop() {
b.original.ACK()
releaseBatch(b)
}
func (b *Batch) Retry() {
b.ctx.retryer.retry(b)
}
func (b *Batch) Cancelled() {
b.ctx.retryer.cancelled(b)
}
func (b *Batch) RetryEvents(events []publisher.Event) {
b.updEvents(events)
b.Retry()
}
func (b *Batch) CancelledEvents(events []publisher.Event) {
b.updEvents(events)
b.Cancelled()
}
func (b *Batch) updEvents(events []publisher.Event) {
l1 := len(b.events)
l2 := len(events)
if l1 > l2 {
// report subset of events not to be retried as ACKed
b.ctx.observer.outBatchACKed(l1 - l2)
}
b.events = events
}