forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
queue.go
90 lines (76 loc) · 3.2 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package queue
import (
"io"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/publisher"
)
// Factory for creating a queue used by a pipeline instance.
type Factory func(Eventer, *common.Config) (Queue, error)
// Eventer listens to special events to be send by queue implementations.
type Eventer interface {
OnACK(int) // number of consecutively published messages, acked by producers
}
// Queue is responsible for accepting, forwarding and ACKing events.
// A queue will receive and buffer single events from its producers.
// Consumers will receive events in batches from the queues buffers.
// Once a consumer has finished processing a batch, it must ACK the batch, for
// the queue to advance its buffers. Events in progress or ACKed are not readable
// from the queue.
// When the queue decides it is safe to progress (events have been ACKed by
// consumer or flush to some other intermediate storage), it will send an ACK signal
// with the number of ACKed events to the Producer (ACK happens in batches).
type Queue interface {
io.Closer
BufferConfig() BufferConfig
Producer(cfg ProducerConfig) Producer
Consumer() Consumer
}
// BufferConfig returns the pipelines buffering settings,
// for the pipeline to use.
// In case of the pipeline itself storing events for reporting ACKs to clients,
// but still dropping events, the pipeline can use the buffer information,
// to define an upper bound of events being active in the pipeline.
type BufferConfig struct {
Events int // can be <= 0, if queue can not determine limit
}
// ProducerConfig as used by the Pipeline to configure some custom callbacks
// between pipeline and queue.
type ProducerConfig struct {
// if ACK is set, the callback will be called with number of events produced
// by the producer instance and being ACKed by the queue.
ACK func(count int)
// OnDrop provided to the queue, to report events being silently dropped by
// the queue. For example an async producer close and publish event,
// with close happening early might result in the event being dropped. The callback
// gives a queue user a chance to keep track of total number of events
// being buffered by the queue.
OnDrop func(beat.Event)
// DropOnCancel is a hint to the queue to drop events if the producer disconnects
// via Cancel.
DropOnCancel bool
}
// Producer interface to be used by the pipelines client to forward events to be
// published to the queue.
// When a producer calls `Cancel`, it's up to the queue to send or remove
// events not yet ACKed.
// Note: A queue is still allowed to send the ACK signal after Cancel. The
// pipeline client must filter out ACKs after cancel.
type Producer interface {
Publish(event publisher.Event) bool
TryPublish(event publisher.Event) bool
Cancel() int
}
// Consumer interface to be used by the pipeline output workers.
// The `Get` method retrieves a batch of events up to size `sz`. If sz <= 0,
// the batch size is up to the queue.
type Consumer interface {
Get(sz int) (Batch, error)
Close() error
}
// Batch of events to be returned to Consumers. The `ACK` method will send the
// ACK signal to the queue.
type Batch interface {
Events() []publisher.Event
ACK()
}