/
internal_consumer_group_handler.go
50 lines (41 loc) · 1.18 KB
/
internal_consumer_group_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package gkafka
import (
"context"
"github.com/Shopify/sarama"
"github.com/neutrinocorp/gluon"
)
type internalConsumerGroupHandler struct {
parentDriver *driver
sub *gluon.Subscriber
}
var _ sarama.ConsumerGroupHandler = &internalConsumerGroupHandler{}
func newInternalConsumerGroup(d *driver, s *gluon.Subscriber) *internalConsumerGroupHandler {
return &internalConsumerGroupHandler{
parentDriver: d,
sub: s,
}
}
func (i *internalConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error {
return nil
}
func (i *internalConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
return nil
}
func (i *internalConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error {
for kMsg := range claim.Messages() {
scopedCtx := context.TODO()
msg := new(gluon.TransportMessage)
unmarshalKafkaMessage(kMsg, msg)
err := i.parentDriver.messageHandler(scopedCtx, i.sub, msg)
if err == nil {
session.MarkMessage(kMsg, "")
}
}
return nil
}
func (i *internalConsumerGroupHandler) logError(err error) {
if i.parentDriver.isLoggingEnabled() {
i.parentDriver.parentBus.Logger.Print(err)
}
}