forked from ilge/wabbit
/
exchange.go
118 lines (92 loc) · 3.21 KB
/
exchange.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package main
import (
"flag"
"log"
"github.com/NeowayLabs/wabbit"
"github.com/NeowayLabs/wabbit/amqp"
)
var (
uri = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI")
queueName = flag.String("queue", "test-queue", "Ephemeral AMQP queue name")
exchange = flag.String("exchange", "test-exchange", "Durable, non-auto-deleted AMQP exchange name")
exchangeType = flag.String("exchange-type", "direct", "Exchange type - direct|fanout|topic|x-custom")
body = flag.String("body", "body test", "Body of message")
reliable = flag.Bool("reliable", true, "Wait for the publisher confirmation before exiting")
)
func init() {
flag.Parse()
}
func main() {
publish(*uri, *queueName, *exchange, *exchangeType, *body, *reliable)
}
func publish(uri string, queueName string, exchange string, exchangeType string, body string, reliable bool) {
log.Println("[-] Connecting to", uri)
connection, err := connect(uri)
if err != nil {
log.Fatalf("[x] AMQP connection error: %s", err)
}
log.Println("[√] Connected successfully")
channel, err := connection.Channel()
if err != nil {
log.Fatalf("[x] Failed to open a channel: %s", err)
}
defer channel.Close()
log.Println("[-] Declaring Exchange", exchangeType, exchange)
err = channel.ExchangeDeclare(exchange, exchangeType, nil)
if err != nil {
log.Fatalf("[x] Failed to declare exchange: %s", err)
}
log.Println("[√] Exchange", exchange, "has been declared successfully")
log.Println("[-] Declaring queue", queueName, "into channel")
queue, err := declareQueue(queueName, channel)
if err != nil {
log.Fatalf("[x] Queue could not be declared. Error: %s", err.Error())
}
log.Println("[√] Queue", queueName, "has been declared successfully")
if reliable {
log.Printf("[-] Enabling publishing confirms.")
if err := channel.Confirm(false); err != nil {
log.Fatalf("[x] Channel could not be put into confirm mode: %s", err)
}
confirms := channel.NotifyPublish(make(chan wabbit.Confirmation, 1))
defer confirmOne(confirms)
}
log.Println("[-] Sending message to queue:", queueName, "- exchange:", exchange)
log.Println("\t", body)
err = publishMessage(body, exchange, queue, channel)
if err != nil {
log.Fatalf("[x] Failed to publish a message. Error: %s", err.Error())
}
}
func connect(uri string) (wabbit.Conn, error) {
return amqp.Dial(uri)
}
func declareQueue(queueName string, channel wabbit.Channel) (wabbit.Queue, error) {
return channel.QueueDeclare(
queueName,
wabbit.Option{
"durable": true,
"autoDelete": false,
"exclusive": false,
"noWait": false,
},
)
}
func publishMessage(body string, exchange string, queue wabbit.Queue, channel wabbit.Channel) error {
return channel.Publish(
exchange, // exchange
queue.Name(), // routing key
[]byte(body),
wabbit.Option{
"deliveryMode": 2,
"contentType": "text/plain",
})
}
func confirmOne(confirms <-chan wabbit.Confirmation) {
log.Printf("[-] Waiting for confirmation of one publishing")
if confirmed := <-confirms; confirmed.Ack() {
log.Printf("[√] Confirmed delivery with delivery tag: %d", confirmed.DeliveryTag())
} else {
log.Printf("[x] Failed delivery of delivery tag: %d", confirmed.DeliveryTag())
}
}