-
Notifications
You must be signed in to change notification settings - Fork 17
/
consumer.go
115 lines (97 loc) · 2.99 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package sqs
import (
"context"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
aws_sqs "github.com/aws/aws-sdk-go-v2/service/sqs"
aws_sqs_types "github.com/aws/aws-sdk-go-v2/service/sqs/types"
)
// ConsumerOption represents a consumer option function.
type ConsumerOption func(*Consumer)
// Consumer represents SQS consumer.
type Consumer struct {
api *aws_sqs.Client
url string
maxMessages int32
visibilityTimeout int32
waitTimeSeconds int32
}
// New instances of a Consumer to consume SQS messages.
func NewConsumer(awsConfig aws.Config, url string, opts ...ConsumerOption) (*Consumer, error) {
consumer := &Consumer{
api: aws_sqs.NewFromConfig(awsConfig),
url: url,
maxMessages: 10,
visibilityTimeout: 60,
waitTimeSeconds: 20,
}
for _, opt := range opts {
opt(consumer)
}
return consumer, nil
}
// WithMaxMessages allows to specify an maximum number of messages to return when setting a value.
func WithMaxMessages(v int32) ConsumerOption {
return func(c *Consumer) {
c.maxMessages = v
}
}
// WithVisibilityTimeout allows to specify a visibility timeout when setting a value.
func WithVisibilityTimeout(v int32) ConsumerOption {
return func(c *Consumer) {
c.visibilityTimeout = v
}
}
// WithWaitTimeSeconds allows to specify a wait time when setting a value.
func WithWaitTimeSeconds(v int32) ConsumerOption {
return func(c *Consumer) {
c.waitTimeSeconds = v
}
}
// GetMessages retrieves messages from SQS.
func (c *Consumer) GetMessages(ctx context.Context) ([]aws_sqs_types.Message, error) {
params := &aws_sqs.ReceiveMessageInput{
QueueUrl: aws.String(c.url),
MaxNumberOfMessages: c.maxMessages,
AttributeNames: []aws_sqs_types.QueueAttributeName{
aws_sqs_types.QueueAttributeNameAll,
},
MessageAttributeNames: []string{
string(aws_sqs_types.QueueAttributeNameAll),
},
WaitTimeSeconds: c.waitTimeSeconds,
VisibilityTimeout: c.visibilityTimeout,
}
res, err := c.api.ReceiveMessage(ctx, params)
if err != nil {
return nil, err
}
return res.Messages, nil
}
// DeleteMessage deletes messages from SQS.
func (c *Consumer) DeleteMessage(ctx context.Context, id *string) error {
params := &aws_sqs.DeleteMessageInput{
QueueUrl: aws.String(c.url),
ReceiptHandle: id,
}
_, err := c.api.DeleteMessage(ctx, params)
return err
}
// GetVisibilityTimeout returns visibility timeout.
func (c *Consumer) GetVisibilityTimeout() time.Duration {
return time.Duration(int64(c.visibilityTimeout) * int64(time.Second))
}
// GetQueueAttributes get queue attributes.
func (c *Consumer) GetQueueAttributes(ctx context.Context) (*aws_sqs.GetQueueAttributesOutput, error) {
params := &aws_sqs.GetQueueAttributesInput{
QueueUrl: aws.String(c.url),
AttributeNames: []aws_sqs_types.QueueAttributeName{
aws_sqs_types.QueueAttributeNameCreatedTimestamp,
},
}
return c.api.GetQueueAttributes(ctx, params)
}
// GetQueueUrl returns queue url.
func (c *Consumer) GetQueueUrl() string {
return c.url
}