Skip to content

Commit

Permalink
Merge pull request #48 from seatgeek/retry-large-sqs
Browse files Browse the repository at this point in the history
Retry SQS batch when too large
  • Loading branch information
lorenzo authored Jun 27, 2019
2 parents bcfaea0 + 1b0c98c commit 74d2395
Showing 1 changed file with 19 additions and 4 deletions.
23 changes: 19 additions & 4 deletions sink/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"
"strconv"
"strings"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -143,10 +144,15 @@ func (s *SQSSink) write() {
id = id + 1
}

_, err := s.sqs.SendMessageBatch(&sqs.SendMessageBatchInput{
Entries: entries,
QueueUrl: aws.String(s.queueName),
})
err := s.sendBatch(entries)
if err != nil && strings.Contains(err.Error(), "AWS.SimpleQueueService.BatchRequestTooLong") {
for i := 0; i < len(entries); i += i {
err = s.sendBatch([]*sqs.SendMessageBatchRequestEntry{entries[i]})
if err != nil {
log.Errorf("[sink/sqs] %s", err)
}
}
}

if err != nil {
log.Errorf("[sink/sqs] %s", err)
Expand All @@ -156,3 +162,12 @@ func (s *SQSSink) write() {
}
}
}

func (s *SQSSink) sendBatch(entries []*sqs.SendMessageBatchRequestEntry) error {
_, err := s.sqs.SendMessageBatch(&sqs.SendMessageBatchInput{
Entries: entries,
QueueUrl: aws.String(s.queueName),
})

return err
}

0 comments on commit 74d2395

Please sign in to comment.