Skip to content
This repository has been archived by the owner on Nov 27, 2023. It is now read-only.

Support default exchange in consumer and publisher modules #213

Merged
merged 7 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 12 additions & 2 deletions lib/binding.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ defmodule GenRMQ.Binding do
declaring consumer bindings and exchanges.
"""

@type exchange :: String.t() | {exchange_kind(), String.t()}
@type exchange :: String.t() | {exchange_kind(), String.t()} | :default
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add something that explains :default in the documentation here as well? I think it would be the first place I would look.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vorce good point. Will update / extend the documentation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vorce added some docs. Let me know if this is smth you had in mind.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mkorszun yes, perfect 👍

@type exchange_kind :: :topic | :direct | :fanout

use AMQP

@default_exchange ""

@doc false
def bind_exchange_and_queue(chan, exchange, queue, routing_key) do
declare_exchange(chan, exchange)
Expand All @@ -29,6 +31,8 @@ defmodule GenRMQ.Binding do
end

@doc false
def declare_exchange(_chan, :default), do: :ok

def declare_exchange(chan, {:direct, exchange}) do
Exchange.direct(chan, exchange, durable: true)
end
Expand All @@ -45,6 +49,10 @@ defmodule GenRMQ.Binding do
Exchange.topic(chan, exchange, durable: true)
end

def exchange_name(:default) do
@default_exchange
end

def exchange_name({_, exchange}) do
exchange
end
Expand All @@ -56,13 +64,15 @@ defmodule GenRMQ.Binding do

defp bind_queue(chan, queue, exchange_name, routing_key) when is_list(routing_key) do
Enum.reduce_while(routing_key, :ok, fn rk, acc ->
case Queue.bind(chan, queue, exchange_name, routing_key: rk) do
case bind_queue(chan, queue, exchange_name, rk) do
:ok -> {:cont, acc}
err -> {:halt, err}
end
end)
end

defp bind_queue(_chan, _queue, :default, _routing_key), do: :ok

defp bind_queue(chan, queue, exchange_name, routing_key) do
Queue.bind(chan, queue, exchange_name, routing_key: routing_key)
end
Expand Down
4 changes: 2 additions & 2 deletions lib/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ defmodule GenRMQ.Consumer do

`queue` - the name of the queue to consume. If it does not exist, it will be created.

`exchange` - name or `{type, name}` of the exchange to which `queue` should be bound. If it does not exist, it will be created.
For valid exchange types see `GenRMQ.Binding`.
`exchange` - name, `:default` or `{type, name}` of the target exchange.
If it does not exist, it will be created. Supported types: `:direct`, `:fanout`, `:topic`

`routing_key` - queue binding key, can also be a list.

Expand Down
4 changes: 2 additions & 2 deletions lib/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ defmodule GenRMQ.Publisher do

`connection` - RabbitMQ connection options. Accepts same arguments as AMQP-library's [Connection.open/2](https://hexdocs.pm/amqp/AMQP.Connection.html#open/2).

`exchange` - name or `{type, name}` of the target exchange. If it does not exist, it will be created.
For valid exchange types see `GenRMQ.Binding`.
`exchange` - name, `:default` or `{type, name}` of the target exchange.
If it does not exist, it will be created. Supported types: `:direct`, `:fanout`, `:topic`

### Optional:

Expand Down
30 changes: 27 additions & 3 deletions test/gen_rmq_consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ defmodule GenRMQ.ConsumerTest do
WithTopicExchange,
WithoutConcurrency,
WithoutDeadletter,
WithoutReconnection
WithoutReconnection,
WithDefaultExchange
}

@connection "amqp://guest:guest@localhost:5672"
Expand Down Expand Up @@ -277,7 +278,7 @@ defmodule GenRMQ.ConsumerTest do
with_test_consumer(ErrorWithoutConcurrency)
end

test "should receive invoke the handle_error callback if an error is encountered with no concurrency",
test "should invoke the handle_error callback if an error is encountered with no concurrency",
%{consumer: consumer_pid, state: state} = context do
clear_mailbox()

Expand Down Expand Up @@ -593,6 +594,29 @@ defmodule GenRMQ.ConsumerTest do
close_connection_and_channels_after_shutdown_test()
end

describe "TestConsumer.WithDefaultExchange" do
setup do
Agent.start_link(fn -> MapSet.new() end, name: WithDefaultExchange)
{:ok, context} = with_test_consumer(WithDefaultExchange)
routing_key = context.state.config[:queue][:name]
{:ok, Map.put(context, :routing_key, routing_key)}
end

receive_message_test(WithDefaultExchange)

reject_message_test()

reconnect_after_connection_failure_test(WithDefaultExchange)

terminate_after_queue_deletion_test()

exit_signal_after_queue_deletion_test()

close_connection_and_channels_after_deletion_test()

close_connection_and_channels_after_shutdown_test()
end

describe "Telemetry events" do
setup :attach_telemetry_handlers

Expand Down Expand Up @@ -654,7 +678,7 @@ defmodule GenRMQ.ConsumerTest do
exchange = state.config[:exchange]

on_exit(fn -> Process.exit(consumer_pid, :normal) end)
{:ok, %{consumer: consumer_pid, exchange: exchange, state: state}}
{:ok, %{consumer: consumer_pid, exchange: exchange, state: state, routing_key: "#"}}
end

defp clear_mailbox do
Expand Down
21 changes: 20 additions & 1 deletion test/gen_rmq_publisher_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ defmodule GenRMQ.PublisherTest do
alias TestPublisher.{
Default,
RedeclaringExistingExchange,
WithConfirmations
WithConfirmations,
WithDefaultExchange
}

@connection "amqp://guest:guest@localhost:5672"
Expand Down Expand Up @@ -334,6 +335,24 @@ defmodule GenRMQ.PublisherTest do
end
end

describe "TestPublisher.WithDefaultExchange" do
setup do
with_test_publisher(WithDefaultExchange)
end

test "should publish a message to the default exchange", %{publisher: publisher_pid} = context do
message = %{"msg" => "with default exchange"}
# https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchange-default
# all queues are bound to the default exchange with a queue name as a binding key
routing_key = context.out_queue
publish_result = GenRMQ.Publisher.publish(publisher_pid, Jason.encode!(message), routing_key)

Assert.repeatedly(fn -> assert out_queue_count(context) >= 1 end)
assert match?({:ok, ^message, _}, get_message_from_queue(context))
assert {:ok, :confirmed} == publish_result
end
end

describe "Telemetry events" do
setup :attach_telemetry_handlers

Expand Down
2 changes: 1 addition & 1 deletion test/support/assert.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule GenRMQ.Test.Assert do
Agent.start_link(fn -> Map.new() end, name: __MODULE__)
end

def repeatedly(function, time \\ 10_000, interval \\ 250) do
def repeatedly(function, time \\ 15_000, interval \\ 250) do
task = Task.async(fn -> repeatedly_loop(function, interval) end)

case Task.yield(task, time) do
Expand Down
6 changes: 3 additions & 3 deletions test/support/consumer_shared_tests.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule ConsumerSharedTests do
test "should receive a message", context do
message = %{"msg" => "some message"}

publish_message(context[:rabbit_conn], context[:exchange], Jason.encode!(message))
publish_message(context[:rabbit_conn], context[:exchange], Jason.encode!(message), context[:routing_key])

GenRMQ.Test.Assert.repeatedly(fn ->
assert Agent.get(unquote(mod), fn set -> message in set end) == true
Expand All @@ -22,7 +22,7 @@ defmodule ConsumerSharedTests do
test "should reject a message", %{state: state} = context do
message = "reject"

publish_message(context[:rabbit_conn], context[:exchange], Jason.encode!(message))
publish_message(context[:rabbit_conn], context[:exchange], Jason.encode!(message), context[:routing_key])

GenRMQ.Test.Assert.repeatedly(fn ->
assert queue_count(context[:rabbit_conn], state.config[:queue][:dead_letter][:name]) == {:ok, 1}
Expand All @@ -37,7 +37,7 @@ defmodule ConsumerSharedTests do
message = "disconnect"
AMQP.Connection.close(state.conn)

publish_message(context[:rabbit_conn], context[:exchange], Jason.encode!(message))
publish_message(context[:rabbit_conn], context[:exchange], Jason.encode!(message), context[:routing_key])

GenRMQ.Test.Assert.repeatedly(fn ->
assert Agent.get(unquote(mod), fn set -> message in set end) == true
Expand Down
33 changes: 33 additions & 0 deletions test/support/test_consumers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,39 @@ defmodule TestConsumer do
end
end

defmodule WithDefaultExchange do
@moduledoc false
@behaviour GenRMQ.Consumer

def init() do
[
queue: "gen_rmq_with_default_exchange",
exchange: :default,
prefetch_count: "10",
connection: "amqp://guest:guest@localhost:5672",
queue_ttl: 1_000
]
end

def consumer_tag() do
"TestConsumer.WithDefaultExchange"
end

def handle_message(%GenRMQ.Message{payload: "\"reject\""} = message) do
GenRMQ.Consumer.reject(message)
end

def handle_message(message) do
payload = Jason.decode!(message.payload)
Agent.update(__MODULE__, &MapSet.put(&1, payload))
GenRMQ.Consumer.ack(message)
end

def handle_error(message, _reason) do
GenRMQ.Consumer.reject(message)
end
end

defmodule RedeclaringExistingExchange do
@moduledoc false
@behaviour GenRMQ.Consumer
Expand Down
14 changes: 14 additions & 0 deletions test/support/test_publishers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,18 @@ defmodule TestPublisher do
]
end
end

defmodule WithDefaultExchange do
@moduledoc false
@behaviour GenRMQ.Publisher

def init() do
[
exchange: :default,
connection: "amqp://guest:guest@localhost:5672",
app_id: :my_app_id,
enable_confirmations: true
]
end
end
end