Skip to content

Commit

Permalink
fix(kafka): simple retry logic fetch msg
Browse files Browse the repository at this point in the history
  • Loading branch information
wojciech12 committed Feb 10, 2020
1 parent 515cf67 commit 74293da
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 11 deletions.
19 changes: 17 additions & 2 deletions messaging/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,26 @@ type KafkaReader struct {
// readBroker us as a wrapper for kafka.Reader implementation to fulfill BrokerReader interface
type readBroker struct {
*kafka.Reader
maxRetries int
retriesInterval time.Duration
}

// FetchMessages used to fetch messages from the broker
func (rm *readBroker) FetchMessage(ctx context.Context) (Message, error) {
m, err := rm.Reader.FetchMessage(ctx)
var err error
var m kafka.Message
var retryNumber = 0

for retryNumber <= rm.maxRetries {
m, err = rm.Reader.FetchMessage(ctx)
if err != nil {
retryNumber = retryNumber + 1
log.Errorf("# messaging # retry number %v failed, trying again, err: %v", retryNumber, err)
time.Sleep(rm.retriesInterval)
continue
}
break
}

if err != nil {
return Message{}, err
Expand Down Expand Up @@ -110,7 +125,7 @@ func NewReader(brokers []string, groupID string, topic string) *KafkaReader {
return &KafkaReader{brokers: brokers,
groupID: groupID,
topic: topic,
brokerReader: &readBroker{kafkaReader},
brokerReader: &readBroker{kafkaReader, retries, intervalTime},
maxRetries: retries,
retriesInterval: intervalTime,
}
Expand Down
19 changes: 10 additions & 9 deletions messaging/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package messaging
import (
"context"
"fmt"
"github.com/microdevs/missy/service"
"testing"
"time"

"github.com/microdevs/missy/service"

"reflect"

"bou.ke/monkey"
Expand Down Expand Up @@ -198,7 +199,7 @@ func TestReadBroker_FetchMessage(t *testing.T) {

defer monkey.Unpatch(kr.FetchMessage)

rb := readBroker{kr}
rb := readBroker{kr, 0, 0}

msg, err := rb.FetchMessage(context.Background())

Expand Down Expand Up @@ -256,7 +257,7 @@ func TestReadBroker_FetchMessage_Error(t *testing.T) {

defer monkey.Unpatch(kr.FetchMessage)

rb := readBroker{kr}
rb := readBroker{kr, 1, 1}

_, err := rb.FetchMessage(context.Background())

Expand Down Expand Up @@ -287,7 +288,7 @@ func TestReadBroker_ReadMessage(t *testing.T) {

defer monkey.Unpatch(kr.ReadMessage)

rb := readBroker{kr}
rb := readBroker{kr, 0, 0}

msg, err := rb.ReadMessage(context.Background())

Expand Down Expand Up @@ -334,7 +335,7 @@ func TestReadBroker_ReadMessage_Error(t *testing.T) {

defer monkey.Unpatch(kr.ReadMessage)

rb := readBroker{kr}
rb := readBroker{kr, 0, 0}

_, err := rb.ReadMessage(context.Background())

Expand Down Expand Up @@ -371,7 +372,7 @@ func TestReadBroker_CommitMessages(t *testing.T) {

defer monkey.Unpatch(kr.CommitMessages)

rb := readBroker{kr}
rb := readBroker{kr, 0, 0}

err := rb.CommitMessages(context.Background(), messages...)

Expand Down Expand Up @@ -409,7 +410,7 @@ func TestReadBroker_CommitMessages_Error(t *testing.T) {

defer monkey.Unpatch(kr.CommitMessages)

rb := readBroker{kr}
rb := readBroker{kr, 0, 0}

err := rb.CommitMessages(context.Background(), messages...)

Expand Down Expand Up @@ -441,7 +442,7 @@ func TestReadBroker_Close(t *testing.T) {

defer monkey.Unpatch(kr.Close)

rb := readBroker{kr}
rb := readBroker{kr, 0, 0}

err := rb.Close()

Expand Down Expand Up @@ -473,7 +474,7 @@ func TestReadBroker_Close_Error(t *testing.T) {

defer monkey.Unpatch(kr.Close)

rb := readBroker{kr}
rb := readBroker{kr, 0, 0}

err := rb.Close()

Expand Down

0 comments on commit 74293da

Please sign in to comment.