Skip to content
This repository has been archived by the owner on Nov 27, 2023. It is now read-only.

Commit

Permalink
Merge 82f2b3f into e577f96
Browse files Browse the repository at this point in the history
  • Loading branch information
mkorszun committed Nov 19, 2018
2 parents e577f96 + 82f2b3f commit b505a10
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 87 deletions.
79 changes: 27 additions & 52 deletions test/gen_rmq_consumer_test.exs
Expand Up @@ -34,24 +34,22 @@ defmodule GenRMQ.ConsumerTest do
end

describe "GenRMQ.Consumer.stop/2" do
test "should close connection after being stopped" do
{:ok, consumer_pid} = Consumer.start_link(Default)
state = :sys.get_state(consumer_pid)
setup do
with_test_consumer(Default)
end

test "should close connection after normal termination", %{consumer: consumer_pid, state: state} do
Consumer.stop(consumer_pid, :normal)

Assert.repeatedly(fn ->
assert Process.alive?(state.conn.pid) == false
end)
assert_receive({:EXIT, ^consumer_pid, :normal})
assert Process.alive?(state.conn.pid) == false
end

test "should send exit signal after abnormal termination" do
Process.flag(:trap_exit, true)
{:ok, consumer_pid} = Consumer.start_link(Default)

Consumer.stop(consumer_pid, :not_normal)
test "should close connection after abnormal termination", %{consumer: consumer_pid, state: state} do
Consumer.stop(consumer_pid, :unexpected_reason)

assert_receive({:EXIT, ^consumer_pid, :not_normal})
assert_receive({:EXIT, ^consumer_pid, :unexpected_reason})
assert Process.alive?(state.conn.pid) == false
end
end

Expand All @@ -71,9 +69,8 @@ defmodule GenRMQ.ConsumerTest do
end)
end

test "should reject a message", %{consumer: consumer_pid} = context do
test "should reject a message", %{state: state} = context do
message = "reject"
state = :sys.get_state(consumer_pid)

publish_message(context[:rabbit_conn], context[:exchange], Poison.encode!(message))

Expand All @@ -82,42 +79,32 @@ defmodule GenRMQ.ConsumerTest do
end)
end

test "should reconnect after connection failure", %{consumer: consumer_pid} = context do
test "should reconnect after connection failure", %{state: state} = context do
message = "disconnect"
state = :sys.get_state(consumer_pid)

AMQP.Connection.close(state.conn)

publish_message(context[:rabbit_conn], context[:exchange], Poison.encode!(message))

Assert.repeatedly(fn ->
assert Agent.get(Default, fn set -> message in set end) == true
end)
end

test "should terminate after queue deletion", %{consumer: consumer_pid} do
Process.flag(:trap_exit, true)
state = :sys.get_state(consumer_pid)

test "should terminate after queue deletion", %{consumer: consumer_pid, state: state} do
AMQP.Queue.delete(state.out, state[:config][:queue])

Assert.repeatedly(fn ->
assert Process.alive?(consumer_pid) == false
end)
end

test "should send exit signal after queue deletion", %{consumer: consumer_pid} do
Process.flag(:trap_exit, true)
state = :sys.get_state(consumer_pid)

test "should send exit signal after queue deletion", %{consumer: consumer_pid, state: state} do
AMQP.Queue.delete(state.out, state[:config][:queue])

assert_receive({:EXIT, ^consumer_pid, :cancelled})
end

test "should close connection and channels after queue deletion", %{consumer: consumer_pid} do
Process.flag(:trap_exit, true)
state = :sys.get_state(consumer_pid)

test "should close connection and channels after queue deletion", %{state: state} do
AMQP.Queue.delete(state.out, state[:config][:queue])

Assert.repeatedly(fn ->
Expand All @@ -127,11 +114,7 @@ defmodule GenRMQ.ConsumerTest do
end)
end

test "should close connection and channels after shutdown signal" do
Process.flag(:trap_exit, true)
{:ok, consumer_pid} = Consumer.start_link(Default)
state = :sys.get_state(consumer_pid)

test "should close connection and channels after shutdown signal", %{consumer: consumer_pid, state: state} do
Process.exit(consumer_pid, :shutdown)

Assert.repeatedly(fn ->
Expand Down Expand Up @@ -164,10 +147,7 @@ defmodule GenRMQ.ConsumerTest do
with_test_consumer(WithoutReconnection)
end

test "should terminate after connection failure", %{consumer: consumer_pid} do
Process.flag(:trap_exit, true)
state = :sys.get_state(consumer_pid)

test "should terminate after connection failure", %{consumer: consumer_pid, state: state} do
AMQP.Connection.close(state.conn)

Assert.repeatedly(fn ->
Expand All @@ -176,10 +156,7 @@ defmodule GenRMQ.ConsumerTest do
end)
end

test "should send exit signal after connection failure", %{consumer: consumer_pid} do
Process.flag(:trap_exit, true)
state = :sys.get_state(consumer_pid)

test "should send exit signal after connection failure", %{consumer: consumer_pid, state: state} do
AMQP.Connection.close(state.conn)

assert_receive({:EXIT, ^consumer_pid, :connection_closed})
Expand All @@ -191,14 +168,14 @@ defmodule GenRMQ.ConsumerTest do
with_test_consumer(WithoutDeadletter)
end

test "should skip deadletter setup", %{consumer: consumer_pid} = context do
test "should skip deadletter setup", %{consumer: consumer_pid, state: state} = context do
message = %{"msg" => "some message"}
state = :sys.get_state(consumer_pid)

publish_message(context[:rabbit_conn], state.config[:exchange], Poison.encode!(message))
publish_message(context[:rabbit_conn], context[:exchange], Poison.encode!(message))

Assert.repeatedly(fn ->
assert Process.alive?(consumer_pid) == true
assert queue_count(context[:rabbit_conn], "#{state.config[:queue]}") == {:ok, 0}
assert queue_count(context[:rabbit_conn], "#{state.config[:queue]}_error") == {:error, :not_found}
end)
end
Expand All @@ -211,9 +188,8 @@ defmodule GenRMQ.ConsumerTest do

test "should deadletter a message to a custom queue", %{consumer: consumer_pid} = context do
message = %{"msg" => "some message"}
state = :sys.get_state(consumer_pid)

publish_message(context[:rabbit_conn], state.config[:exchange], Poison.encode!(message))
publish_message(context[:rabbit_conn], context[:exchange], Poison.encode!(message))

Assert.repeatedly(fn ->
assert Process.alive?(consumer_pid) == true
Expand All @@ -240,14 +216,13 @@ defmodule GenRMQ.ConsumerTest do
end

defp with_test_consumer(module) do
Process.flag(:trap_exit, true)
{:ok, consumer_pid} = Consumer.start_link(module)

exchange =
module
|> :erlang.apply(:init, [])
|> Keyword.get(:exchange)
state = :sys.get_state(consumer_pid)
exchange = state.config[:exchange]

on_exit(fn -> Process.exit(consumer_pid, :normal) end)
{:ok, %{consumer: consumer_pid, exchange: exchange}}
{:ok, %{consumer: consumer_pid, exchange: exchange, state: state}}
end
end
70 changes: 36 additions & 34 deletions test/gen_rmq_publisher_test.exs
Expand Up @@ -2,6 +2,7 @@ defmodule GenRMQ.PublisherTest do
use ExUnit.Case, async: false
use GenRMQ.RabbitCase

alias GenRMQ.Publisher
alias GenRMQ.Test.Assert

@uri "amqp://guest:guest@localhost:5672"
Expand Down Expand Up @@ -30,23 +31,27 @@ defmodule GenRMQ.PublisherTest do
purge_queues(@uri, [@out_queue])
end

describe "GenRMQ.Publisher" do
test "should start new publisher for given module" do
{:ok, pid} = GenRMQ.Publisher.start_link(TestPublisher, name: TestPublisher)
assert pid == Process.whereis(TestPublisher)
describe "start_link/2" do
test "should start a new publisher" do
{:ok, pid} = GenRMQ.Publisher.start_link(TestPublisher)
assert Process.alive?(pid)
end

test "should return publisher config" do
{:ok, config} = GenRMQ.Publisher.init(%{module: TestPublisher})
test "should start a new publisher registered by name" do
{:ok, pid} = GenRMQ.Publisher.start_link(TestPublisher, name: TestPublisher)
assert Process.whereis(TestPublisher) == pid
end
end

assert TestPublisher.init() == config[:config]
describe "GenRMQ.Publisher" do
setup do
with_test_publisher()
end

test "should publish message", context do
test "should publish message", %{publisher: publisher_pid} = context do
message = %{"msg" => "msg"}

{:ok, _} = GenRMQ.Publisher.start_link(TestPublisher, name: TestPublisher)
GenRMQ.Publisher.publish(TestPublisher, Poison.encode!(%{"msg" => "msg"}))
GenRMQ.Publisher.publish(publisher_pid, Poison.encode!(%{"msg" => "msg"}))

Assert.repeatedly(fn -> assert out_queue_count(context) >= 1 end)
{:ok, received_message, meta} = get_message_from_queue(context)
Expand All @@ -56,11 +61,10 @@ defmodule GenRMQ.PublisherTest do
assert [] == meta[:headers]
end

test "should publish message with custom routing key", context do
test "should publish message with custom routing key", %{publisher: publisher_pid} = context do
message = %{"msg" => "msg"}

{:ok, _} = GenRMQ.Publisher.start_link(TestPublisher, name: TestPublisher)
GenRMQ.Publisher.publish(TestPublisher, Poison.encode!(message), "some.routing.key")
GenRMQ.Publisher.publish(publisher_pid, Poison.encode!(message), "some.routing.key")

Assert.repeatedly(fn -> assert out_queue_count(context) >= 1 end)
{:ok, received_message, meta} = get_message_from_queue(context)
Expand All @@ -70,11 +74,10 @@ defmodule GenRMQ.PublisherTest do
assert [] == meta[:headers]
end

test "should publish message with headers", context do
test "should publish message with headers", %{publisher: publisher_pid} = context do
message = %{"msg" => "msg"}

{:ok, _} = GenRMQ.Publisher.start_link(TestPublisher, name: TestPublisher)
GenRMQ.Publisher.publish(TestPublisher, Poison.encode!(message), "some.routing.key", header1: "value")
GenRMQ.Publisher.publish(publisher_pid, Poison.encode!(message), "some.routing.key", header1: "value")

Assert.repeatedly(fn -> assert out_queue_count(context) >= 1 end)
{:ok, received_message, meta} = get_message_from_queue(context)
Expand All @@ -83,13 +86,11 @@ defmodule GenRMQ.PublisherTest do
assert [{"header1", :longstr, "value"}] == meta[:headers]
end

test "should override standard metadata fields from headers", context do
test "should override standard metadata fields from headers", %{publisher: publisher_pid} = context do
message = %{"msg" => "msg"}

{:ok, _} = GenRMQ.Publisher.start_link(TestPublisher, name: TestPublisher)

GenRMQ.Publisher.publish(
TestPublisher,
publisher_pid,
Poison.encode!(message),
"some.routing.key",
message_id: "message_id_1",
Expand All @@ -108,13 +109,11 @@ defmodule GenRMQ.PublisherTest do
assert [{"header1", :longstr, "value"}] == meta[:headers]
end

test "should publish a message with priority", context do
test "should publish a message with priority", %{publisher: publisher_pid} = context do
message = %{"msg" => "with prio"}

{:ok, _} = GenRMQ.Publisher.start_link(TestPublisher, name: TestPublisher)

GenRMQ.Publisher.publish(
TestPublisher,
publisher_pid,
Poison.encode!(message),
"some.routing.key",
priority: 100
Expand All @@ -127,20 +126,17 @@ defmodule GenRMQ.PublisherTest do
assert meta[:priority] == 100
end

test "should reconnect after connection failure", context do
test "should reconnect after connection failure", %{publisher: publisher_pid, state: state} = context do
message = %{"msg" => "pub_disc"}

{:ok, publisher_pid} = GenRMQ.Publisher.start_link(TestPublisher, name: TestPublisher)

state = :sys.get_state(publisher_pid)
Process.exit(state.channel.conn.pid, :kill)

Assert.repeatedly(fn ->
new_state = :sys.get_state(publisher_pid)
assert new_state.channel.conn.pid != state.channel.conn.pid
end)

GenRMQ.Publisher.publish(TestPublisher, Poison.encode!(message))
GenRMQ.Publisher.publish(publisher_pid, Poison.encode!(message))

Assert.repeatedly(fn -> assert out_queue_count(context) >= 1 end)
{:ok, received_message, meta} = get_message_from_queue(context)
Expand All @@ -150,11 +146,7 @@ defmodule GenRMQ.PublisherTest do
assert [] == meta[:headers]
end

test "should close connection and channel on termination" do
Process.flag(:trap_exit, true)
{:ok, publisher_pid} = GenRMQ.Publisher.start_link(TestPublisher)
state = :sys.get_state(publisher_pid)

test "should close connection and channel on termination", %{publisher: publisher_pid, state: state} do
Process.exit(publisher_pid, :shutdown)

Assert.repeatedly(fn ->
Expand All @@ -163,4 +155,14 @@ defmodule GenRMQ.PublisherTest do
end)
end
end

defp with_test_publisher(module \\ TestPublisher) do
Process.flag(:trap_exit, true)
{:ok, publisher_pid} = Publisher.start_link(module)

state = :sys.get_state(publisher_pid)

on_exit(fn -> Process.exit(publisher_pid, :normal) end)
{:ok, %{publisher: publisher_pid, state: state}}
end
end
2 changes: 1 addition & 1 deletion test/test_helper.exs
@@ -1,3 +1,3 @@
ExUnit.start()
ExUnit.start(capture_log: true)

GenRMQ.Test.Assert.start_link([], [])

0 comments on commit b505a10

Please sign in to comment.