-
Notifications
You must be signed in to change notification settings - Fork 121
/
producer.go
254 lines (238 loc) · 8.32 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
package producer
import (
"fmt"
"sync"
"time"
"github.com/mailgun/kafka-pixy/Godeps/_workspace/src/github.com/mailgun/log"
"github.com/mailgun/kafka-pixy/Godeps/_workspace/src/github.com/mailgun/sarama"
"github.com/mailgun/kafka-pixy/config"
)
const (
maxEncoderReprLength = 4096
)
// T builds on top of `sarama.AsyncProducer` to improve the shutdown handling.
// The problem it solves is that `sarama.AsyncProducer` drops all buffered
// messages as soon as it is ordered to shutdown. On the contrary, when `T` is
// ordered to stop it allows some time for the buffered messages to be
// committed to the Kafka cluster, and only when that time has elapsed it drops
// uncommitted messages.
//
// TODO Consider implementing some sort of dead message processing.
type T struct {
baseCID *sarama.ContextID
saramaClient sarama.Client
saramaProducer sarama.AsyncProducer
shutdownTimeout time.Duration
deadMessageCh chan<- *sarama.ProducerMessage
dispatcherCh chan *sarama.ProducerMessage
resultCh chan produceResult
wg sync.WaitGroup
}
type produceResult struct {
Msg *sarama.ProducerMessage
Err error
}
// Spawn creates a producer instance and starts its internal goroutines.
func Spawn(cfg *config.T) (*T, error) {
saramaCfg := sarama.NewConfig()
saramaCfg.ChannelBufferSize = cfg.Producer.ChannelBufferSize
saramaCfg.Producer.RequiredAcks = sarama.WaitForAll
saramaCfg.Producer.Return.Successes = true
saramaCfg.Producer.Return.Errors = true
saramaCfg.Producer.Compression = sarama.CompressionSnappy
saramaCfg.Producer.Retry.Backoff = 10 * time.Second
saramaCfg.Producer.Retry.Max = 6
saramaCfg.Producer.Flush.Frequency = 500 * time.Millisecond
saramaCfg.Producer.Flush.Bytes = 1024 * 1024
saramaClient, err := sarama.NewClient(cfg.Kafka.SeedPeers, saramaCfg)
if err != nil {
return nil, fmt.Errorf("failed to create sarama.Client, err=(%s)", err)
}
saramaProducer, err := sarama.NewAsyncProducerFromClient(saramaClient)
if err != nil {
return nil, fmt.Errorf("failed to create sarama.Producer, err=(%s)", err)
}
p := &T{
baseCID: sarama.RootCID.NewChild("producer"),
saramaClient: saramaClient,
saramaProducer: saramaProducer,
shutdownTimeout: cfg.Producer.ShutdownTimeout,
deadMessageCh: cfg.Producer.DeadMessageCh,
dispatcherCh: make(chan *sarama.ProducerMessage, cfg.Producer.ChannelBufferSize),
resultCh: make(chan produceResult, cfg.Producer.ChannelBufferSize),
}
spawn(&p.wg, p.merge)
spawn(&p.wg, p.dispatch)
return p, nil
}
// Stop shuts down all producer goroutines and releases all resources.
func (p *T) Stop() {
close(p.dispatcherCh)
p.wg.Wait()
}
// Produce submits a message to the specified `topic` of the Kafka cluster
// using `key` to identify a destination partition. The exact algorithm used to
// map keys to partitions is implementation specific but it is guaranteed that
// it returns consistent results. If `key` is `nil`, then the message is placed
// into a random partition.
//
// Errors usually indicate a catastrophic failure of the Kafka cluster, or
// missing topic if there cluster is not configured to auto create topics.
func (p *T) Produce(topic string, key, message sarama.Encoder) (*sarama.ProducerMessage, error) {
replyCh := make(chan produceResult, 1)
prodMsg := &sarama.ProducerMessage{
Topic: topic,
Key: key,
Value: message,
Metadata: replyCh,
}
p.dispatcherCh <- prodMsg
result := <-replyCh
return result.Msg, result.Err
}
// AsyncProduce is an asynchronously counterpart of the `Produce` function.
// Errors are silently ignored.
func (p *T) AsyncProduce(topic string, key, message sarama.Encoder) {
prodMsg := &sarama.ProducerMessage{
Topic: topic,
Key: key,
Value: message,
}
p.dispatcherCh <- prodMsg
}
// merge receives both message acknowledgements and producer errors from the
// respective `sarama.AsyncProducer` channels, constructs `ProducerResult`s out
// of them and sends the constructed `ProducerResult` instances to `resultCh`
// to be further inspected by the `dispatcher` goroutine.
//
// It keeps running until both `sarama.AsyncProducer` output channels are
// closed. Then it closes the `resultCh` to notify the `dispatcher` goroutine
// that all pending messages have been processed and exits.
func (p *T) merge() {
cid := p.baseCID.NewChild("merge")
defer cid.LogScope()()
nilOrProdSuccessesCh := p.saramaProducer.Successes()
nilOrProdErrorsCh := p.saramaProducer.Errors()
mergeLoop:
for channelsOpened := 2; channelsOpened > 0; {
select {
case ackedMsg, ok := <-nilOrProdSuccessesCh:
if !ok {
channelsOpened -= 1
nilOrProdSuccessesCh = nil
continue mergeLoop
}
p.resultCh <- produceResult{Msg: ackedMsg}
case prodErr, ok := <-nilOrProdErrorsCh:
if !ok {
channelsOpened -= 1
nilOrProdErrorsCh = nil
continue mergeLoop
}
p.resultCh <- produceResult{Msg: prodErr.Msg, Err: prodErr.Err}
}
}
// Close the result channel to notify the `dispatcher` goroutine that all
// pending messages have been processed.
close(p.resultCh)
}
// dispatch implements message processing and graceful shutdown. It receives
// messages from `dispatchedCh` where they are send to by `Produce` method and
// submits them to the embedded `sarama.AsyncProducer`. The dispatcher main
// purpose is to prevent loss of messages during shutdown. It achieves that by
// allowing some graceful period after it stops receiving messages and stopping
// the embedded `sarama.AsyncProducer`.
func (p *T) dispatch() {
cid := p.baseCID.NewChild("dispatch")
defer cid.LogScope()()
nilOrDispatcherCh := p.dispatcherCh
var nilOrProdInputCh chan<- *sarama.ProducerMessage
pendingMsgCount := 0
// The normal operation loop is implemented as two-stroke machine. On the
// first stroke a message is received from `dispatchCh`, and on the second
// it is sent to `prodInputCh`. Note that producer results can be received
// at any time.
prodMsg := (*sarama.ProducerMessage)(nil)
channelOpened := true
for {
select {
case prodMsg, channelOpened = <-nilOrDispatcherCh:
if !channelOpened {
goto gracefulShutdown
}
pendingMsgCount += 1
nilOrDispatcherCh = nil
nilOrProdInputCh = p.saramaProducer.Input()
case nilOrProdInputCh <- prodMsg:
nilOrDispatcherCh = p.dispatcherCh
nilOrProdInputCh = nil
case prodResult := <-p.resultCh:
pendingMsgCount -= 1
p.handleProduceResult(cid, prodResult)
}
}
gracefulShutdown:
// Give the `sarama.AsyncProducer` some time to commit buffered messages.
log.Infof("<%v> About to stop producer: pendingMsgCount=%d", cid, pendingMsgCount)
shutdownTimeoutCh := time.After(p.shutdownTimeout)
for pendingMsgCount > 0 {
select {
case <-shutdownTimeoutCh:
goto shutdownNow
case prodResult := <-p.resultCh:
pendingMsgCount -= 1
p.handleProduceResult(cid, prodResult)
}
}
shutdownNow:
log.Infof("<%v> Stopping producer: pendingMsgCount=%d", cid, pendingMsgCount)
p.saramaProducer.AsyncClose()
for prodResult := range p.resultCh {
p.handleProduceResult(cid, prodResult)
}
}
// handleProduceResult inspects a production results and if it is an error
// then logs it and flushes it down the `deadMessageCh` if one had been
// configured.
func (p *T) handleProduceResult(cid *sarama.ContextID, result produceResult) {
if replyCh, ok := result.Msg.Metadata.(chan produceResult); ok {
replyCh <- result
}
if result.Err == nil {
return
}
prodMsgRepr := fmt.Sprintf(`{Topic: "%s", Key: "%s", Value: "%s"}`,
result.Msg.Topic, encoderRepr(result.Msg.Key), encoderRepr(result.Msg.Value))
log.Errorf("<%v> Failed to submit message: msg=%v, err=(%s)",
cid, prodMsgRepr, result.Err)
if p.deadMessageCh != nil {
p.deadMessageCh <- result.Msg
}
}
// encoderRepr returns the string representation of an encoder value. The value
// is truncated to `maxEncoderReprLength`.
func encoderRepr(e sarama.Encoder) string {
var repr string
switch e := e.(type) {
case sarama.StringEncoder:
repr = string(e)
case sarama.ByteEncoder:
repr = fmt.Sprintf("%X", []byte(e))
default:
repr = fmt.Sprint(e)
}
if length := len(repr); length > maxEncoderReprLength {
repr = fmt.Sprintf("%s... (%d bytes more)",
repr[:maxEncoderReprLength], length-maxEncoderReprLength)
}
return repr
}
// spawn starts function `f` as a goroutine making it a member of the `wg`
// wait group.
func spawn(wg *sync.WaitGroup, f func()) {
wg.Add(1)
go func() {
defer wg.Done()
f()
}()
}