-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.go
166 lines (143 loc) · 4.78 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
/*
* Copyright (C) Continental Automotive GmbH 2019
* Alle Rechte vorbehalten. All Rights Reserved.
* The reproduction, transmission or use of this document or its contents is not
* permitted without express written authority. Offenders will be liable for
* damages. All rights, including rights created by patent grant or registration of
* a utility model or design, are reserved.
*/
package sqs
import (
"context"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"go.uber.org/zap"
"golang.org/x/xerrors"
"github.com/scraly/go.pkg/log"
)
// MessageHandler is responsible to consume a single message.
type MessageHandler func(ctx context.Context, message string) error
// QueueConsumer allows to consume an AWS SQS queue.
type QueueConsumer struct {
svc sqsiface.SQSAPI
queue string
maxNumberOfMessages int64
visibilityTimeout time.Duration
heartbeatInterval time.Duration
waitTime time.Duration
forever bool
}
// NewQueueConsumer creates a QueueConsumer from the given configuration.
func NewQueueConsumer(conf *Configuration, awsSession client.ConfigProvider) *QueueConsumer {
return NewQueueConsumerWithClient(conf, sqs.New(awsSession))
}
// NewQueueConsumerWithClient creates a QueueConsumer from the given configuration and using a
// preconfigured SQS client.
func NewQueueConsumerWithClient(conf *Configuration, sqsClient sqsiface.SQSAPI) *QueueConsumer {
return &QueueConsumer{
svc: sqsClient,
queue: conf.QueueURL,
maxNumberOfMessages: conf.MaxNumberOfMessages,
visibilityTimeout: conf.VisibilityTimeout,
heartbeatInterval: conf.HeartbeatInterval,
waitTime: conf.WaitTime,
forever: conf.Forever,
}
}
var noDelay = time.Duration(0)
func extractDelay(err error) *time.Duration {
if err == nil {
return nil
}
var retriableError RetriableError
if xerrors.As(err, &retriableError) {
return &retriableError.Delay
}
return &noDelay
}
// ConsumeMessages using the given handler.
//
// Each message is kept invisible to other consumers until its handler returns.
// Returns the count of consumed messages along with the encountered error, if any.
func (m *QueueConsumer) ConsumeMessages(ctx context.Context, handler MessageHandler) (int, error) {
consumed := 0
for {
log.For(ctx).Debug("Receive messages",
zap.String("queue", m.queue),
zap.Int64("maxNumberOfMessages", m.maxNumberOfMessages),
zap.Duration("visibilityTimeout", m.visibilityTimeout),
zap.Duration("waitTime", m.waitTime),
)
result, err := m.svc.ReceiveMessageWithContext(ctx, &sqs.ReceiveMessageInput{
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
},
MessageAttributeNames: []*string{
aws.String(sqs.QueueAttributeNameAll),
},
QueueUrl: aws.String(m.queue),
MaxNumberOfMessages: aws.Int64(m.maxNumberOfMessages), // The SQS-enforced maximum is 10
VisibilityTimeout: aws.Int64(int64(m.visibilityTimeout / time.Second)),
WaitTimeSeconds: aws.Int64(int64(m.waitTime / time.Second)), // The SQS-enforced maximum is 20s
})
if err != nil {
log.For(ctx).Error("Failed to receive messages", zap.Error(err))
return consumed, err
}
log.For(ctx).Debug("Received messages",
zap.Int("messages", len(result.Messages)),
)
if len(result.Messages) == 0 {
if m.forever {
continue
} else {
break
}
}
// Lock all messages
locks := make([]*MessageLock, len(result.Messages))
for index, message := range result.Messages {
locks[index] = NewMessageLock(ctx, m.svc, m.queue, m.visibilityTimeout, m.heartbeatInterval, message)
}
// Process each message and stop at the first failure
var firstErr error
for _, lock := range locks {
if firstErr != nil {
_ = lock.Release(&noDelay)
continue
}
err := handler(ctx, aws.StringValue(lock.Message().Body))
retryDelay := extractDelay(err)
switch {
case retryDelay == nil:
// No error: having retryDelay to nil marks the message as consumed, it will be deleted.
case *retryDelay != 0:
log.For(ctx).Warn("Schedule message processing to be retried later",
zap.String("messageID", aws.StringValue(lock.Message().MessageId)),
zap.Duration("retryDelay", *retryDelay),
)
default:
log.For(ctx).Error("Failed to process message",
zap.String("messageID", aws.StringValue(lock.Message().MessageId)),
zap.Error(err),
)
firstErr = err
}
err = lock.Release(retryDelay)
if firstErr == nil {
if err != nil {
firstErr = err
} else if retryDelay == nil {
consumed++
}
}
}
if firstErr != nil {
return consumed, firstErr
}
}
return consumed, nil
}