-
Notifications
You must be signed in to change notification settings - Fork 13
/
retry_consumer.go
213 lines (201 loc) · 6.62 KB
/
retry_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
package app
import (
"context"
"fmt"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/jitsucom/bulker/kafkabase"
"strconv"
"time"
)
type RetryConsumer struct {
*AbstractBatchConsumer
}
func NewRetryConsumer(repository *Repository, destinationId string, batchPeriodSec int, topicId string, config *Config, kafkaConfig *kafka.ConfigMap, bulkerProducer *Producer) (*RetryConsumer, error) {
base, err := NewAbstractBatchConsumer(repository, destinationId, batchPeriodSec, topicId, "retry", config, kafkaConfig, bulkerProducer)
if err != nil {
return nil, err
}
rc := RetryConsumer{
AbstractBatchConsumer: base,
}
rc.batchFunc = rc.processBatchImpl
rc.shouldConsumeFunc = rc.shouldConsumeFuncImpl
rc.pause(false)
return &rc, nil
}
func (rc *RetryConsumer) shouldConsumeFuncImpl(committedOffset, highOffset int64) bool {
var firstPosition *kafka.TopicPartition
defer func() {
//recover
if r := recover(); r != nil {
rc.SystemErrorf("Recovered from panic: %v", r)
}
if firstPosition != nil {
_, err := rc.consumer.Load().SeekPartitions([]kafka.TopicPartition{*firstPosition})
if err != nil {
rc.SystemErrorf("Failed to seek to first position: %v", err)
//rc.restartConsumer()
}
}
}()
currentOffset := committedOffset
for currentOffset < highOffset {
message, err := rc.consumer.Load().ReadMessage(rc.waitForMessages)
if err != nil {
kafkaErr := err.(kafka.Error)
if kafkaErr.Code() == kafka.ErrTimedOut {
rc.Debugf("Timeout. No messages to retry. %d-%d", committedOffset, highOffset)
return false
}
rc.Infof("Failed to check shouldConsume. %d-%d. Error: %v", committedOffset, highOffset, err)
// we don't handle errors here. allow consuming to handle error properly
return true
}
if firstPosition == nil {
firstPosition = &message.TopicPartition
}
currentOffset = int64(message.TopicPartition.Offset)
if rc.isTimeToRetry(message) {
rc.Debugf("Found message to retry: %d of %d-%d", currentOffset, committedOffset, highOffset)
//at least one message is ready to retry. we should consume
return true
}
}
rc.Debugf("No messages to retry. %d-%d", committedOffset, highOffset)
return false
}
func (rc *RetryConsumer) processBatchImpl(_ *Destination, _, _, retryBatchSize int, highOffset int64) (counters BatchCounters, nextBatch bool, err error) {
counters.firstOffset = int64(kafka.OffsetBeginning)
var firstPosition *kafka.TopicPartition
var lastPosition *kafka.TopicPartition
txOpened := false
var producer *kafka.Producer
defer func() {
//recover
if r := recover(); r != nil {
err = rc.NewError("Recovered from panic: %v", r)
rc.SystemErrorf("Recovered from panic: %v", r)
}
if err != nil {
counters.notReadyReadded = 0
counters.retryScheduled = 0
//cleanup
if firstPosition != nil {
_, err2 := rc.consumer.Load().SeekPartitions([]kafka.TopicPartition{*firstPosition})
if err2 != nil {
rc.SystemErrorf("Failed to seek to first position: %v", err2)
//rc.restartConsumer()
}
}
if txOpened {
_ = producer.AbortTransaction(context.Background())
}
nextBatch = false
}
if producer != nil {
producer.Close()
}
}()
nextBatch = true
for i := 0; i < retryBatchSize; i++ {
if rc.retired.Load() {
return
}
if lastPosition != nil && int64(lastPosition.Offset) >= highOffset-1 {
nextBatch = false
rc.Debugf("Reached watermark offset %d. Stopping batch", highOffset-1)
// we reached the end of the topic
break
}
message, err := rc.consumer.Load().ReadMessage(rc.waitForMessages)
if err != nil {
kafkaErr := err.(kafka.Error)
if kafkaErr.Code() == kafka.ErrTimedOut {
nextBatch = false
// waitForMessages period is over. it's ok. considering batch as full
break
}
return counters, false, rc.NewError("Failed to consume event from topic. Retryable: %t: %v", kafkaErr.IsRetriable(), kafkaErr)
}
counters.consumed++
lastPosition = &message.TopicPartition
if counters.consumed == 1 {
counters.firstOffset = int64(message.TopicPartition.Offset)
firstPosition = &message.TopicPartition
producer, err = rc.initTransactionalProducer()
if err != nil {
return counters, false, err
}
err = producer.BeginTransaction()
if err != nil {
return counters, false, fmt.Errorf("failed to begin kafka transaction: %v", err)
}
txOpened = true
}
singleCount := BatchCounters{}
originalTopic := kafkabase.GetKafkaHeader(message, originalTopicHeader)
topic := originalTopic
if topic == "" {
singleCount.skipped++
rc.Errorf("Failed to get original topic from message headers. Skipping message")
continue
}
rc.Debugf("message %s header: %v", message.TopicPartition.Offset, message.Headers)
retries, err := kafkabase.GetKafkaIntHeader(message, retriesCountHeader)
if err != nil {
singleCount.skipped++
rc.Errorf("Failed to get retries count from message headers. Skipping message")
continue
}
headers := message.Headers
if !rc.isTimeToRetry(message) {
singleCount.notReadyReadded++
// retry time is not yet come. requeueing message
topic = rc.topicId
} else {
retries++
singleCount.retryScheduled++
}
kafkabase.PutKafkaHeader(&headers, retriesCountHeader, strconv.Itoa(retries))
err = producer.Produce(&kafka.Message{
Key: message.Key,
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Headers: headers,
Value: message.Value,
}, nil)
if err != nil {
return counters, false, fmt.Errorf("failed to put message to producer: %v", err)
}
counters.accumulate(singleCount)
}
if !txOpened {
return
}
groupMetadata, err := rc.consumer.Load().GetConsumerGroupMetadata()
if err != nil {
return counters, false, fmt.Errorf("failed to get consumer group metadata: %v", err)
}
offset := *lastPosition
offset.Offset++
//set consumer offset to the next message after failure. that happens atomically with whole producer transaction
err = producer.SendOffsetsToTransaction(context.Background(), []kafka.TopicPartition{offset}, groupMetadata)
if err != nil {
return counters, false, fmt.Errorf("failed to send consumer offset to producer transaction: %v", err)
}
err = producer.CommitTransaction(context.Background())
if err != nil {
return counters, false, fmt.Errorf("failed to commit kafka transaction for producer: %v", err)
}
return
}
func (rc *RetryConsumer) isTimeToRetry(message *kafka.Message) bool {
retryTime, err := kafkabase.GetKafkaTimeHeader(message, retryTimeHeader)
if err != nil {
rc.Errorf("failed to parse retry_time: %v", err)
return true
}
if retryTime.IsZero() || time.Now().After(retryTime) {
return true
}
return false
}