/
sqs.go
176 lines (145 loc) · 4.51 KB
/
sqs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package myaws
import (
"context"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
)
func Example() {
}
func defaultSqsClient(ctx context.Context) (*sqs.Client, error) {
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return nil, fmt.Errorf("error loading default configuration: %w", err)
}
client := sqs.NewFromConfig(cfg)
return client, err
}
// https://github.com/localstack/localstack-aws-sdk-examples/blob/main/go/s3-basic-v2.go
// https://towardsaws.com/sns-and-sqs-with-localstack-using-golang-16b291f45e0b
func localSqsClient(ctx context.Context, awsEndpoint string, awsRegion string) (*sqs.Client, error) {
// TODO deprecated
// TODO catch error ???
endpointResolver :=
aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
return aws.Endpoint{
PartitionID: "aws",
URL: awsEndpoint,
SigningRegion: awsRegion,
HostnameImmutable: true,
}, nil
})
cfg, err := config.LoadDefaultConfig(ctx,
config.WithRegion(awsRegion),
config.WithEndpointResolver(endpointResolver),
)
if err != nil {
return nil, fmt.Errorf("error loading local configuration: %w", err)
}
client := sqs.NewFromConfig(cfg)
return client, err
}
func getQueueUrl(ctx context.Context, client *sqs.Client, queueName string) (*string, error) {
getQueueUrlRequest := &sqs.GetQueueUrlInput{
QueueName: &queueName,
}
urlResult, err := client.GetQueueUrl(ctx, getQueueUrlRequest)
if err != nil {
return nil, fmt.Errorf("error getting queue url: %w", err)
}
return urlResult.QueueUrl, nil
}
func sendMessage(ctx context.Context, client *sqs.Client, queueUrl string, body string) (*string, error) {
messageInput := &sqs.SendMessageInput{
DelaySeconds: 10,
MessageBody: aws.String(body),
QueueUrl: &queueUrl,
}
resp, err := client.SendMessage(ctx, messageInput)
if err != nil {
return nil, fmt.Errorf("error sending message: %w", err)
}
return resp.MessageId, err
}
// exampleSend https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/gov2/sqs/SendMessage/SendMessagev2.go
func exampleSend() (string, error) {
// TODO param
ctx := context.TODO()
awsEndpoint := "http://localhost:4566"
awsRegion := "us-east-1"
queueName := "go-sqs-example"
messageBody := "hello"
fmt.Println(fmt.Sprintf("Sending message in queue: %v", queueName))
client, err := localSqsClient(ctx, awsEndpoint, awsRegion)
if err != nil {
return "", err
}
queueUrl, err := getQueueUrl(ctx, client, queueName)
if err != nil {
return "", err
}
messageId, err := sendMessage(ctx, client, *queueUrl, messageBody)
if err != nil {
return "", err
}
return *messageId, nil
}
func Send() {
messageIdSent, err := exampleSend()
if err != nil {
fmt.Println(fmt.Sprintf("FAILURE send %v", err))
}
fmt.Println(fmt.Sprintf("SEND messageId: %v", messageIdSent))
}
func receiveMessage(ctx context.Context, client *sqs.Client, queueUrl string) (string, error) {
messageInput := &sqs.ReceiveMessageInput{
MessageAttributeNames: []string{
string(types.QueueAttributeNameAll),
},
QueueUrl: &queueUrl,
MaxNumberOfMessages: 1,
VisibilityTimeout: 10,
}
msgResult, err := client.ReceiveMessage(ctx, messageInput)
if err != nil {
return "", fmt.Errorf("error receiving messages: %w", err)
}
// TODO map collection []string{}
if msgResult.Messages != nil {
return *msgResult.Messages[0].MessageId, nil
} else {
return "NONE", nil
}
}
// exampleReceive https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/gov2/sqs/ReceiveMessage/ReceiveMessagev2.go
// TODO loop https://github.com/awsdocs/aws-doc-sdk-examples/blob/main/gov2/sqs/ReceiveLPMessage/ReceiveLPMessagev2.go
func exampleReceive() (string, error) {
ctx := context.TODO()
awsEndpoint := "http://localhost:4566"
awsRegion := "us-east-1"
queueName := "go-sqs-example"
fmt.Println(fmt.Sprintf("Receiving message from queue: %v", queueName))
client, err := localSqsClient(ctx, awsEndpoint, awsRegion)
if err != nil {
return "", err
}
queueUrl, err := getQueueUrl(ctx, client, queueName)
if err != nil {
return "", err
}
messageId, err := receiveMessage(ctx, client, *queueUrl)
if err != nil {
return "", err
}
return messageId, nil
}
func Receive() {
// TODO ExampleReceiveLoop
messageIdReceived, err := exampleReceive()
if err != nil {
fmt.Println(fmt.Sprintf("FAILURE receive %v", err))
}
fmt.Println(fmt.Sprintf("RECEIVE messageId: %v", messageIdReceived))
}