-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
125 lines (103 loc) · 2.82 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"os"
"strconv"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
)
func main() {
queueUrl := flag.String("q", "", "The queue url")
timeout := flag.Int("t", 20, "How long, in seconds, that the message is hidden from others")
fileName := flag.String("f", "msgs.txt", "The name of the output file")
flag.Parse()
if *queueUrl == "" {
fmt.Println("You must supply the url of a queue (-q QUEUE_URL)")
return
}
if *timeout < 0 {
*timeout = 0
}
if *timeout > 12*60*60 {
*timeout = 12 * 60 * 60
}
// Open a file for writing
file, err := os.Create(*fileName)
if err != nil {
fmt.Println("Error creating file:", err)
return
}
defer file.Close()
// Create a JSON encoder by passing a writer
encoder := json.NewEncoder(file)
cfg, err := config.LoadDefaultConfig(context.TODO())
if err != nil {
panic("configuration error, " + err.Error())
}
client := sqs.NewFromConfig(cfg)
input := &sqs.GetQueueAttributesInput{
QueueUrl: queueUrl,
AttributeNames: []types.QueueAttributeName{"ApproximateNumberOfMessages"},
}
result, err := client.GetQueueAttributes(context.TODO(), input)
if err != nil {
fmt.Println("Got an error getting the queue attributes")
fmt.Println(err)
return
}
approximateMessages, err := strconv.Atoi(result.Attributes["ApproximateNumberOfMessages"])
if err != nil {
fmt.Println("Got an error converting string to int")
fmt.Println(err)
return
}
shouldReceive := approximateMessages > 0
for shouldReceive {
receiveMessageInput := &sqs.ReceiveMessageInput{
MessageAttributeNames: []string{
string(types.QueueAttributeNameAll),
},
QueueUrl: queueUrl,
MaxNumberOfMessages: 10,
VisibilityTimeout: int32(*timeout),
WaitTimeSeconds: 20,
}
msgResult, err := client.ReceiveMessage(context.TODO(), receiveMessageInput)
if err != nil {
fmt.Println("Got an error receiving messages:")
fmt.Println(err)
return
}
if msgResult.Messages == nil {
fmt.Println("No messages found")
break
}
deleteEntries := []types.DeleteMessageBatchRequestEntry{}
for _, msg := range msgResult.Messages {
// Encode and write the data to the file
err = encoder.Encode(msg)
if err != nil {
fmt.Println("Error encoding msg JSON:", err)
return
}
deleteEntries = append(deleteEntries, types.DeleteMessageBatchRequestEntry{
Id: msg.MessageId,
ReceiptHandle: msg.ReceiptHandle,
})
}
deleteMessageBatchInput := &sqs.DeleteMessageBatchInput{
QueueUrl: queueUrl,
Entries: deleteEntries,
}
_, err = client.DeleteMessageBatch(context.TODO(), deleteMessageBatchInput)
if err != nil {
fmt.Println("Got an error deleting the messages:")
fmt.Println(err)
return
}
}
}