-
Notifications
You must be signed in to change notification settings - Fork 0
/
sqs_receiver.go
58 lines (48 loc) · 1.47 KB
/
sqs_receiver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package myrpc
import (
"context"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/pkg/errors"
)
type sqsReceiver struct {
ctx context.Context
sqs *sqs.SQS
conf ReceiverConf
queueURL string
}
// NewSQSReceiver returns a SQS client using for receiving message
func NewSQSReceiver(ctx context.Context, conf ReceiverConf) (MessageReceiver, error) {
sqsClient, queueURL, err := newSQSClient(ctx, conf.Queue)
if err != nil {
return nil, errors.Wrapf(err, "cannot init sqs client for sqsReceiver with conf: %+v", conf.Queue)
}
return &sqsReceiver{
sqs: sqsClient,
ctx: ctx,
conf: conf,
queueURL: queueURL,
}, nil
}
func (sr *sqsReceiver) ReceiveMsg() ([]*RPCMessage, error) {
param := &sqs.ReceiveMessageInput{
QueueUrl: aws.String(sr.queueURL),
MaxNumberOfMessages: aws.Int64(sr.conf.NumMsgsPerReceive),
VisibilityTimeout: aws.Int64(sr.conf.VisibilityTimeout), // sec
WaitTimeSeconds: aws.Int64(sr.conf.WaitTimeSeconds), // sec
}
resp, err := sr.sqs.ReceiveMessageWithContext(sr.ctx, param)
if err != nil {
return nil, errors.Wrapf(err, "cannot recv message with params: %+v", param)
}
ret := make([]*RPCMessage, len(resp.Messages))
for k, m := range resp.Messages {
rpcMsg, err := JSONToRPCMsg(*m.Body)
if err != nil {
return nil, errors.Wrapf(err, "cannot convert to rpc msg: %+v", m)
}
rpcMsg.msgReceiptHandle = *m.ReceiptHandle
ret[k] = rpcMsg
}
return ret, nil
}