forked from premendrasingh/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
produce.go
131 lines (110 loc) · 2.59 KB
/
produce.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package memqueue
import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common/atomic"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/publisher/queue"
)
type forgetfullProducer struct {
broker *Broker
openState openState
}
type ackProducer struct {
broker *Broker
cancel bool
seq uint32
state produceState
openState openState
}
type openState struct {
isOpen atomic.Bool
done chan struct{}
events chan pushRequest
}
type produceState struct {
cb ackHandler
dropCB func(beat.Event)
cancelled bool
lastACK uint32
}
type ackHandler func(count int)
func newProducer(b *Broker, cb ackHandler, dropCB func(beat.Event), dropOnCancel bool) queue.Producer {
openState := openState{
isOpen: atomic.MakeBool(true),
done: make(chan struct{}),
events: b.events,
}
if cb != nil {
p := &ackProducer{broker: b, seq: 1, cancel: dropOnCancel, openState: openState}
p.state.cb = cb
p.state.dropCB = dropCB
return p
}
return &forgetfullProducer{broker: b, openState: openState}
}
func (p *forgetfullProducer) Publish(event publisher.Event) bool {
return p.openState.publish(p.makeRequest(event))
}
func (p *forgetfullProducer) TryPublish(event publisher.Event) bool {
return p.openState.tryPublish(p.makeRequest(event))
}
func (p *forgetfullProducer) makeRequest(event publisher.Event) pushRequest {
return pushRequest{event: event}
}
func (p *forgetfullProducer) Cancel() int {
p.openState.Close()
return 0
}
func (p *ackProducer) Publish(event publisher.Event) bool {
return p.openState.publish(p.makeRequest(event))
}
func (p *ackProducer) TryPublish(event publisher.Event) bool {
return p.openState.tryPublish(p.makeRequest(event))
}
func (p *ackProducer) makeRequest(event publisher.Event) pushRequest {
req := pushRequest{
event: event,
seq: p.seq,
state: &p.state,
}
p.seq++
return req
}
func (p *ackProducer) Cancel() int {
p.openState.Close()
if p.cancel {
ch := make(chan producerCancelResponse)
p.broker.pubCancel <- producerCancelRequest{
state: &p.state,
resp: ch,
}
// wait for cancel to being processed
resp := <-ch
return resp.removed
}
return 0
}
func (st *openState) Close() {
st.isOpen.Store(false)
close(st.done)
}
func (st *openState) publish(req pushRequest) bool {
select {
case st.events <- req:
return true
case <-st.done:
st.events = nil
return false
}
}
func (st *openState) tryPublish(req pushRequest) bool {
select {
case st.events <- req:
return true
case <-st.done:
st.events = nil
return false
default:
return false
}
}