-
Notifications
You must be signed in to change notification settings - Fork 1
/
consumer.go
117 lines (100 loc) · 2.94 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package client
import (
"context"
"time"
"github.com/segmentio/kafka-go"
)
type ConsumerStartOffset int64
const (
// LastOffset is the most recent offset available for a partition
LastOffset ConsumerStartOffset = iota
// FirstOffset is the least recent offset available for a partition
FirstOffset
)
type ConsumerConfig struct {
GroupID string
Partition int
StartOffset ConsumerStartOffset
CommitInterval time.Duration
FetchBatchesMaxWait time.Duration
Logger Logger
ErrorLogger Logger
}
// Consumer provides a high-level API for reading messages from Kafka
type Consumer struct {
reader *kafka.Reader
}
// NewConsumer instantiates a new consumer.
func (c *Client) NewConsumer(topic string, conf ConsumerConfig) *Consumer { // skipcq: CRT-P0003
var readerConf kafka.ReaderConfig
readerConf.Brokers = c.addresses
readerConf.Topic = topic
readerConf.Dialer = c.dialer
readerConf.GroupID = conf.GroupID
readerConf.Partition = conf.Partition
readerConf.CommitInterval = conf.CommitInterval
readerConf.MaxWait = conf.FetchBatchesMaxWait
readerConf.StartOffset = kafka.FirstOffset
if conf.StartOffset == LastOffset {
readerConf.StartOffset = kafka.LastOffset
}
readerConf.Logger = conf.Logger
readerConf.ErrorLogger = conf.ErrorLogger
return &Consumer{
reader: kafka.NewReader(readerConf),
}
}
// Close tries to close the consumer, but it will return sooner if the context is canceled.
// A routine in background will still try to close the producer since the underlying library does not support
// contexts on Close().
func (c *Consumer) Close(ctx context.Context) error {
done := make(chan error, 1)
go func() {
done <- c.reader.Close()
}()
select {
case <-ctx.Done():
return ctx.Err()
case err := <-done:
return err
}
}
// Receive reads and returns the next message from the consumer.
// The method blocks until a message becomes available, or an error occurs.
// The program may also specify a context to asynchronously cancel the blocking operation.
func (c *Consumer) Receive(ctx context.Context) (Message, error) {
msg, err := c.reader.ReadMessage(ctx)
if err != nil {
return Message{}, err
}
var headers []MessageHeader
if l := len(msg.Headers); l > 0 {
headers = make([]MessageHeader, l)
for i := range msg.Headers {
headers[i] = MessageHeader{
Key: msg.Headers[i].Key,
Value: msg.Headers[i].Value,
}
}
}
return Message{
Key: msg.Key,
Value: msg.Value,
Topic: msg.Topic,
Partition: int32(msg.Partition),
Offset: msg.Offset,
Headers: headers,
Timestamp: msg.Time,
}, nil
}
func (c *Consumer) Ack(ctx context.Context, msgs ...Message) error {
internalMsgs := make([]kafka.Message, 0, len(msgs))
for _, msg := range msgs {
internalMsgs = append(internalMsgs, kafka.Message{
Topic: msg.Topic,
Partition: int(msg.Partition),
Offset: msg.Offset,
})
}
return c.reader.CommitMessages(ctx, internalMsgs...)
}