-
Notifications
You must be signed in to change notification settings - Fork 1
/
plm.go
527 lines (452 loc) · 14.3 KB
/
plm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
package wpcm
// copied and modified from https://github.com/blueshift-labs/pulsarlib-go/blob/master/pulsarlib/messaging.go
import (
"context"
"fmt"
"log"
"sync"
"sync/atomic"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)
// RetryMessage
// @Description: Struct for retrying consumed message due to failure. Client can return this struct to ensure the message will be enqueued for a given RetryAfter duration.
type RetryMessage struct {
RetryAfter time.Duration
}
// Message
// @Description: Struct for pulsar message.
type Message struct {
Payload []byte
Properties map[string]string
DeliverAfter time.Duration
}
// Stats
// @Description: Struct for pulsar message stats.
type Stats struct {
TotalMessages uint64
}
// IncrementMessageCount
// @Description: This function will increment the total messages consumed.
// @Param messages
func (s Stats) IncrementMessageCount(messages uint64) {
atomic.AddUint64(&s.TotalMessages, messages)
}
var msging *messaging
// Internal structs
type messageItem struct {
message pulsar.Message
wg *sync.WaitGroup
handler Handler
pulsarc pulsar.Consumer
}
type messaging struct {
nWorkers int
messageCh chan *messageItem
client pulsar.Client
}
func (p *producer) Publish(msgs []*Message) error {
if p.stopped {
return fmt.Errorf("Producer is stopped")
}
for _, msg := range msgs {
pulsarMsg := &pulsar.ProducerMessage{
Payload: msg.Payload,
Properties: msg.Properties,
DeliverAfter: msg.DeliverAfter,
}
_, err := p.pulsarp.Send(context.Background(), pulsarMsg)
if err != nil {
return err
}
}
return nil
}
func (p *producer) PublishOne(msg *Message) error {
if p.stopped {
return fmt.Errorf("Producer is stopped")
}
pulsarMsg := &pulsar.ProducerMessage{
Payload: msg.Payload,
Properties: msg.Properties,
DeliverAfter: msg.DeliverAfter,
}
_, err := p.pulsarp.Send(context.Background(), pulsarMsg)
if err != nil {
return err
}
return nil
}
func (p *producer) Stats() Stats {
return p.stats
}
func (p *producer) Stop() {
if p.stopped {
return
}
p.pulsarp.Close()
p.stopped = true
}
func (m *messaging) processMessageWorker() {
for messageItem := range m.messageCh {
m := &Message{
Payload: messageItem.message.Payload(),
Properties: messageItem.message.Properties(),
}
retry := messageItem.handler.HandleMessage(m)
if retry != nil {
if retry.RetryAfter == 0 {
messageItem.pulsarc.Nack(messageItem.message)
} else {
messageItem.pulsarc.ReconsumeLater(messageItem.message, retry.RetryAfter)
}
} else {
messageItem.pulsarc.Ack(messageItem.message)
}
messageItem.wg.Done()
}
}
func (c *consumer) commit() {
//NoOp for pulsar as consumer are in shared mode and send the acknowledgement individually
}
func (c *consumer) pauseWait() {
<-c.unpauseCh
}
func (c *consumer) messageFetcher() {
for {
ctx, canc := context.WithCancel(c.ctx)
message, err := c.pulsarc.Receive(ctx)
if err != nil && err != context.Canceled {
log.Printf("Error occured in fetching a message. Error: %v", err)
}
canc()
messageItem := &messageItem{
message: message,
wg: c.messageWg,
handler: c.handler,
pulsarc: c.pulsarc,
}
//Message can be nil in case of error
if message != nil {
c.messageWg.Add(1)
c.stats.IncrementMessageCount(1)
msging.messageCh <- messageItem
}
//Check for a pause signal
if c.pauseConsumer {
//Let the fetched messages flush
c.messageWg.Wait()
c.commit()
//Acknowledge the pause signal
c.consumerPausedCh <- true
c.pauseWait()
}
//Check for messageFetcher to be stopped
if c.stopConsumer {
//Let the fetched messages flush
c.messageWg.Wait()
c.commit()
c.consumerStopWg.Done()
return
}
}
}
func (c *consumer) Start() error {
if c.stopConsumer {
return fmt.Errorf("Cannot start a stopped consumer")
}
if c.consumerRunning {
//Consumer is already running
return nil
}
//Start the message fetcher
go c.messageFetcher()
c.consumerRunning = true
return nil
}
func (c *consumer) Stop() error {
if c.stopConsumer {
//Consumer is already stopped
return nil
}
c.consumerStopWg.Add(1)
c.stopConsumer = true
c.canc()
c.consumerStopWg.Wait()
c.consumerRunning = false
c.pulsarc.Close()
return nil
}
func (c *consumer) Unsubscribe() error {
if c.consumerRunning {
//Consumer is running. Stop first
return nil
}
return c.pulsarc.Unsubscribe()
}
func (c *consumer) Pause() {
if c.pauseConsumer {
//Consumer is already paused. Return
return
}
c.pauseConsumer = true
//Wait for the pause to be acknowledged
<-c.consumerPausedCh
}
func (c *consumer) Unpause() {
if !c.pauseConsumer {
//Consumer is not paused
return
}
c.pauseConsumer = false
c.unpauseCh <- true
}
func (c *consumer) Stats() Stats {
return c.stats
}
// InitMessaging
// @Description: This API will initialize the messaging channel.
//
// It will do all the connection initialization.
// workerCount is the number of message processing workers.
//
// @Param workerCount
func InitMessaging(workerCount int, pulsarClientOptions *pulsar.ClientOptions) error {
msging = &messaging{
nWorkers: workerCount,
messageCh: make(chan *messageItem, workerCount*2),
}
//Start the processMessage workers
for i := 0; i < workerCount; i++ {
go msging.processMessageWorker()
}
(*pulsarClientOptions).OperationTimeout = 30 * time.Second
(*pulsarClientOptions).ConnectionTimeout = 30 * time.Second
client, err := pulsar.NewClient(*pulsarClientOptions)
if err != nil {
return fmt.Errorf("could not instantiate Pulsar client: %v", err)
}
msging.client = client
return nil
}
func Cleanup() {
if msging == nil {
return
}
msging.client.Close()
msging = nil
}
type InitialPosition int
const (
// Latest position which means the start consuming position will be the last message
Latest InitialPosition = iota
// Earliest position which means the start consuming position will be the first message
Earliest
)
type ConsumerOpts struct {
SubscriptionName string
RetryEnabled bool
InitialPosition InitialPosition
}
func toInitialPosition(p InitialPosition) pulsar.SubscriptionInitialPosition {
switch p {
case Latest:
return pulsar.SubscriptionPositionLatest
case Earliest:
return pulsar.SubscriptionPositionEarliest
}
return pulsar.SubscriptionPositionEarliest
}
// CreateConsumer
// @Description: This API will create a Consumer for a particular topic.
//
// The handler passed should implement the Handler interface from this module.
// The consumer will create the subscription and be in a passive state until Start() is called.
// The consumer can be Paused and Unpaused at any point.
// The commitInterval used to commit messages after every n messages are consumed.
// The Pause() function will flushout the already received messages and pause receiving any further messages.
// The Unpause() function will resume receiving messages.
// The Stop() function will flush existing messages and stop the consumer. It won't delete the subscription.
// The Unsubscribe() function can be used if subscription needs to be deleted.
// The Stats() function provides the stats for messages consumed.
// Creating multiple instances of Consumer for same topic will deliver message to only one of the instances.
// Inorder to recreate a Consumer for same topic make sure Stop() is called on old Consumer instance.
//
// @Param tenantID
// @Param namespace
// @Param topics
// @Param handler
// @Param opts
func CreateConsumer(tenantID, namespace string, topics []string, handler Handler, opts ConsumerOpts) (Consumer, error) {
//Check if InitMessaging was done prior to this call
if msging == nil {
return nil, fmt.Errorf("InitMessaging not called yet")
}
topicArr := []string{}
for _, tp := range topics {
topicArr = append(topicArr, fmt.Sprintf("persistent://%s/%s/%s", tenantID, namespace, tp))
}
consumerOptions := pulsar.ConsumerOptions{
Topics: topicArr,
SubscriptionName: opts.SubscriptionName,
Type: pulsar.Shared,
SubscriptionInitialPosition: toInitialPosition(opts.InitialPosition),
}
c, err := msging.client.Subscribe(consumerOptions)
if err != nil {
return nil, fmt.Errorf("Error in subscribing to the topics. Error %v", err)
}
ctx, canc := context.WithCancel(context.Background())
consumer := &consumer{
topics: topics,
pulsarc: c,
stats: Stats{},
handler: handler,
ctx: ctx,
canc: canc,
pauseConsumer: false,
consumerPausedCh: make(chan bool, 1),
unpauseCh: make(chan bool, 1),
messageWg: &sync.WaitGroup{},
consumerStopWg: &sync.WaitGroup{},
}
return consumer, nil
}
// CreateSingleTopicConsumer
// @Description: This API will create a Consumer for a particular topic.
//
// The handler passed should implement the Handler interface from this module.
// The consumer will create the subscription and be in a passive state until Start() is called.
// The consumer can be Paused and Unpaused at any point.
// The commitInterval used to commit messages after every n messages are consumed.
// The Pause() function will flushout the already received messages and pause receiving any further messages.
// The Unpause() function will resume receiving messages.
// The Stop() function will flush existing messages and stop the consumer. It won't delete the subscription.
// The Unsubscribe() function can be used if subscription needs to be deleted.
// The Stats() function provides the stats for messages consumed.
// Creating multiple instances of Consumer for same topic will deliver message to only one of the instances.
// Inorder to recreate a Consumer for same topic make sure Stop() is called on old Consumer instance.
// retryEnabled will let the consumer retry message in case of HandleMessage return `RetryMessage` struct.
//
// @Param tenantID
// @Param namespace
// @Param topic
// @Param handler
// @Param opts
func CreateSingleTopicConsumer(tenantID, namespace, topic string, handler Handler, opts ConsumerOpts) (Consumer, error) {
//Check if InitMessaging was done prior to this call
if msging == nil {
return nil, fmt.Errorf("InitMessaging not called yet")
}
topicURI := fmt.Sprintf("persistent://%s/%s/%s", tenantID, namespace, topic)
consumerOptions := pulsar.ConsumerOptions{
Topic: topicURI,
SubscriptionName: opts.SubscriptionName,
Type: pulsar.Shared,
SubscriptionInitialPosition: toInitialPosition(opts.InitialPosition),
}
if opts.RetryEnabled {
// We wanted to retry message for 1 minute before it appended at the back of the DLQ topic.
maxDeliveries := uint32(60) // 1 min
consumerOptions.RetryEnable = true
consumerOptions.NackRedeliveryDelay = 1 * time.Second
consumerOptions.DLQ = &pulsar.DLQPolicy{
MaxDeliveries: maxDeliveries,
DeadLetterTopic: topicURI,
RetryLetterTopic: topicURI,
}
}
c, err := msging.client.Subscribe(consumerOptions)
if err != nil {
return nil, fmt.Errorf("Error in subscribing to the topics. Error %v", err)
}
ctx, canc := context.WithCancel(context.Background())
consumer := &consumer{
topics: []string{topic},
pulsarc: c,
stats: Stats{},
handler: handler,
ctx: ctx,
canc: canc,
pauseConsumer: false,
consumerPausedCh: make(chan bool, 1),
unpauseCh: make(chan bool, 1),
messageWg: &sync.WaitGroup{},
consumerStopWg: &sync.WaitGroup{},
}
return consumer, nil
}
// CreateRegexConsumer
// @Description: This API will create a Consumer for a topics matching the topics pattern.
//
// The handler passed should implement the Handler interface from this module.
// The consumer will create the subscription and be in a passive state until Start() is called.
// The consumer can be Paused and Unpaused at any point.
// The Pause() function will flushout the already received messages and pause receiving any further messages.
// The Unpause() function will resume receiving messages.
// The Stop() function will flush existing messages and stop the consumer. It won't delete the subscription.
// The Unsubscribe() function can be used if subscription needs to be deleted.
// The Stats() function provides the stats for messages consumed.
// Creating multiple instances of Consumer for same topic will deliver message to only one of the instances.
// Inorder to recreate a Consumer for same topic make sure Stop() is called on old Consumer instance.
//
// @Param tenantID
// @Param namespace
// @Param topicsPattern
// @Param handler
// @Param opts
func CreateRegexConsumer(tenantID, namespace, topicsPattern string, handler Handler, opts ConsumerOpts) (Consumer, error) {
//Check if InitMessaging was done prior to this call
if msging == nil {
return nil, fmt.Errorf("InitMessaging not called yet")
}
c, err := msging.client.Subscribe(pulsar.ConsumerOptions{
TopicsPattern: topicsPattern,
SubscriptionName: opts.SubscriptionName,
Type: pulsar.Shared,
SubscriptionInitialPosition: toInitialPosition(opts.InitialPosition),
})
if err != nil {
return nil, fmt.Errorf("Error in subscribing to the topics. Error %v", err)
}
ctx, canc := context.WithCancel(context.Background())
consumer := &consumer{
topicsPattern: topicsPattern,
pulsarc: c,
stats: Stats{},
handler: handler,
ctx: ctx,
canc: canc,
pauseConsumer: false,
consumerPausedCh: make(chan bool, 1),
unpauseCh: make(chan bool, 1),
messageWg: &sync.WaitGroup{},
consumerStopWg: &sync.WaitGroup{},
}
return consumer, nil
}
// CreateProducer
// @Description: This API will create a Producer for a particular topic. The Producer instance can be used to Publish messages to the topic.
// @Param tenantID
// @Param namespace
// @Param topic
func CreateProducer(tenantID string, namespace string, topic string) (Producer, error) {
//Check if InitMessaging was done prior to this call
if msging == nil {
return nil, fmt.Errorf("InitMessaging not called yet")
}
topicPath := fmt.Sprintf("persistent://%s/%s/%s", tenantID, namespace, topic)
p, err := msging.client.CreateProducer(pulsar.ProducerOptions{
Topic: topicPath,
// We wanted to send error in case queue is full, this will give the sender a chance to requeue or retry msg
DisableBlockIfQueueFull: true,
})
if err != nil {
return nil, fmt.Errorf("Error in creating producer. Error %v", err)
}
producer := &producer{
pulsarp: p,
stats: Stats{},
}
return producer, nil
}