forked from scottjbarr/sqsmv
/
main.go
105 lines (82 loc) · 2.05 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package main
import (
"flag"
"fmt"
"log"
"os"
"sync"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
)
func main() {
src := flag.String("src", "", "source queue")
dest := flag.String("dest", "", "destination queue")
flag.Parse()
if *src == "" || *dest == "" {
flag.Usage()
os.Exit(1)
}
if os.Getenv("AWS_REGION") == "" {
fmt.Printf("AWS_REGION not set")
os.Exit(1)
}
region := os.Getenv("AWS_REGION")
log.Printf("source queue : %v", *src)
log.Printf("destination queue : %v", *dest)
config := &aws.Config{
Region: ®ion,
}
client := sqs.New(session.New(), config)
maxMessages := int64(10)
waitTime := int64(0)
messageAttributeNames := aws.StringSlice([]string{"All"})
rmin := &sqs.ReceiveMessageInput{
QueueUrl: src,
MaxNumberOfMessages: &maxMessages,
WaitTimeSeconds: &waitTime,
MessageAttributeNames: messageAttributeNames,
}
// loop as long as there are messages on the queue
for {
resp, err := client.ReceiveMessage(rmin)
if err != nil {
panic(err)
}
if len(resp.Messages) == 0 {
log.Printf("done")
return
}
log.Printf("received %v messages...", len(resp.Messages))
var wg sync.WaitGroup
wg.Add(len(resp.Messages))
for _, m := range resp.Messages {
go func(m *sqs.Message) {
defer wg.Done()
// write the message to the destination queue
smi := sqs.SendMessageInput{
MessageAttributes: m.MessageAttributes,
MessageBody: m.Body,
QueueUrl: dest,
}
_, err := client.SendMessage(&smi)
if err != nil {
log.Printf("ERROR sending message to destination %v", err)
return
}
// message was sent, dequeue from source queue
dmi := &sqs.DeleteMessageInput{
QueueUrl: src,
ReceiptHandle: m.ReceiptHandle,
}
if _, err := client.DeleteMessage(dmi); err != nil {
log.Printf("ERROR dequeueing message ID %v : %v",
*m.ReceiptHandle,
err)
}
}(m)
}
// wait for all jobs from this batch...
wg.Wait()
}
}