-
Notifications
You must be signed in to change notification settings - Fork 0
/
ack.go
37 lines (30 loc) · 1.19 KB
/
ack.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package consume
import (
"context"
"time"
"github.com/apache/pulsar-client-go/pulsar"
)
// MessageHandler is a function that will be called everytime a message arrives at a consumer.
type MessageHandler func(ctx context.Context, msg pulsar.Message) MessageHandlerResult
// MessageHandlerResult will be returned by the MessageHandler. The MessageHandlerResult will Ack/Nack the message.
type MessageHandlerResult func(consumer pulsar.Consumer, msg pulsar.Message) error
// Ack should be returned by a MessageHandler whenever the message needs to be acknowledged.
func Ack() MessageHandlerResult {
return func(consumer pulsar.Consumer, msg pulsar.Message) error {
return consumer.Ack(msg)
}
}
// Nack should be returned by a MessageHandler whenever the message needs to be NOT acknowledged.
func Nack() MessageHandlerResult {
return func(consumer pulsar.Consumer, msg pulsar.Message) error {
consumer.Nack(msg)
return nil
}
}
// Later should be returned by a MessageHandler whenever the message needs to be re-consumed later.
func Later(delay time.Duration) MessageHandlerResult {
return func(consumer pulsar.Consumer, msg pulsar.Message) error {
consumer.ReconsumeLater(msg, delay)
return nil
}
}