This repository has been archived by the owner on Nov 27, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 44
/
rabbit_case.ex
94 lines (79 loc) · 2.67 KB
/
rabbit_case.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
defmodule GenRMQ.RabbitCase do
@moduledoc """
This module defines the setup for tests requiring
access to the rabbit mq.
"""
defmacro __using__([]) do
quote do
use AMQP
def rmq_open(connection) do
AMQP.Connection.open(connection)
end
def open_channel(connection), do: AMQP.Channel.open(connection)
def publish_message(conn, exchange, message, routing_key \\ "#", meta \\ []) do
{:ok, channel} = AMQP.Channel.open(conn)
GenRMQ.Binding.declare_exchange(channel, exchange)
AMQP.Basic.publish(channel, GenRMQ.Binding.exchange_name(exchange), routing_key, message, meta)
AMQP.Channel.close(channel)
end
def setup_out_queue(conn, out_queue, out_exchange) do
{:ok, chan} = AMQP.Channel.open(conn)
AMQP.Queue.declare(chan, out_queue)
AMQP.Exchange.topic(chan, out_exchange, durable: true)
AMQP.Queue.bind(chan, out_queue, out_exchange, routing_key: "#")
AMQP.Channel.close(chan)
end
def get_message_from_queue(context) do
get_message_from_queue(context[:rabbit_conn], context[:out_queue])
end
def get_message_from_queue(conn, queue) do
{:ok, chan} = AMQP.Channel.open(conn)
{:ok, payload, meta} = AMQP.Basic.get(chan, queue)
{:ok, Jason.decode!(payload), meta}
end
def purge_queues(connection, queues) do
{:ok, conn} = rmq_open(connection)
Enum.each(queues, &purge_queue(conn, &1))
AMQP.Connection.close(conn)
end
def purge_queues!(connection, queues) do
{:ok, conn} = rmq_open(connection)
Enum.each(queues, &purge_queue!(conn, &1))
AMQP.Connection.close(conn)
end
def purge_queue(conn, queue) do
try do
purge_queue!(conn, queue)
catch
:exit, _ ->
:ok
end
end
def purge_queue!(conn, queue) do
{:ok, chan} = AMQP.Channel.open(conn)
AMQP.Queue.purge(chan, queue)
AMQP.Channel.close(chan)
end
def out_queue_count(context) do
queue_count!(context[:rabbit_conn], context[:out_queue])
end
def dl_queue_count(context) do
queue_count!(context[:rabbit_conn], context[:dl_queue])
end
def queue_count!(conn, queue) do
{:ok, chan} = AMQP.Channel.open(conn)
{:ok, %{message_count: count}} = AMQP.Queue.declare(chan, queue, passive: true)
AMQP.Channel.close(chan)
count
end
def queue_count(conn, queue) do
try do
{:ok, queue_count!(conn, queue)}
catch
:exit, _ ->
{:error, :not_found}
end
end
end
end
end