/
consumer.go
54 lines (45 loc) · 1.35 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package sqs
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
config "github.com/tommzn/go-config"
)
// NewConsumer returns a client to read messages from an AWS SQS queue.
func NewConsumer(conf config.Config) Consumer {
return newClientFromConfig(conf)
}
// Receive will try to read message from given queue.
func (client *Client) Receive(queueName string) ([]RawMessage, error) {
messsages := []RawMessage{}
qURL, err := client.urlForMessageQueue(queueName)
if err != nil {
return messsages, err
}
receiveMessageOutput, err := client.sqsClient.ReceiveMessage(&sqs.ReceiveMessageInput{
MaxNumberOfMessages: aws.Int64(10),
QueueUrl: qURL,
})
if err != nil {
return messsages, err
}
for _, message := range receiveMessageOutput.Messages {
messsages = append(messsages, RawMessage{
MessageId: message.MessageId,
ReceiptHandle: message.ReceiptHandle,
Body: message.Body,
})
}
return messsages, err
}
// Confirm that the message was received. This will delete this message from given queue.
func (client *Client) Ack(queueName string, receiptHandle *string) error {
qURL, err := client.urlForMessageQueue(queueName)
if err != nil {
return err
}
_, err = client.sqsClient.DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: qURL,
ReceiptHandle: receiptHandle,
})
return err
}