/
consumer.go
130 lines (114 loc) · 3.31 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package broker
import (
"context"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"github.com/Shopify/sarama"
"golang.org/x/exp/slog"
)
type AfterConsumedFunc = func([]byte) error
type Consumer interface {
Consume(topics string)
Close() error
}
type consumer struct {
ready chan bool
cg sarama.ConsumerGroup
afterConsumedFunc AfterConsumedFunc
}
func NewConsumer(addrs, groupID string, afterConsumedFunc AfterConsumedFunc) Consumer {
slog.Info("broker: initializing a consumer group", slog.String("addrs", addrs), slog.String("groupID", groupID))
cg, err := sarama.NewConsumerGroup(strings.Split(addrs, ","), groupID, sarama.NewConfig())
if err != nil {
panic(err)
}
slog.Info("broker: initialized a consumer group")
return &consumer{ready: make(chan bool), cg: cg, afterConsumedFunc: afterConsumedFunc}
}
func (c *consumer) Consume(topic string) {
slog.Info("broker: consuming", slog.String("topic", topic))
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := c.cg.Consume(ctx, []string{topic}, c); err != nil {
slog.Error("consumer: failed to consume", err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}
c.ready = make(chan bool)
}
}()
<-c.ready
sigusr1 := make(chan os.Signal, 1)
signal.Notify(sigusr1, syscall.SIGUSR1)
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
keepRunning, consumptionIsPaused := true, false
for keepRunning {
select {
case <-ctx.Done():
slog.Warn("consumer: context cancelled after terminated")
keepRunning = false
case <-sigterm:
slog.Warn("consumer: terminated via signal")
keepRunning = false
case <-sigusr1:
toggleConsumptionFlow(c.cg, &consumptionIsPaused)
}
}
cancel()
wg.Wait()
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (c *consumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(c.ready)
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (c *consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (c *consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case message := <-claim.Messages():
slog.Info("consumer: message claimed",
slog.String("messageValue", string(message.Value)),
slog.Time("timestamp", message.Timestamp),
slog.String("topic", message.Topic),
)
if err := c.afterConsumedFunc(message.Value); err != nil {
return err
}
session.MarkMessage(message, "")
case <-session.Context().Done():
return nil
}
}
}
func (c *consumer) Close() error {
return c.cg.Close()
}
func toggleConsumptionFlow(cg sarama.ConsumerGroup, isPaused *bool) {
if *isPaused {
cg.ResumeAll()
slog.Info("consumer: resuming consumption")
} else {
cg.PauseAll()
slog.Info("consumer: pausing consumption")
}
*isPaused = !*isPaused
}