Skip to content
This repository has been archived by the owner on Nov 27, 2023. It is now read-only.

Commit

Permalink
Merge 9426acd into 653a5bd
Browse files Browse the repository at this point in the history
  • Loading branch information
mkorszun committed Aug 3, 2018
2 parents 653a5bd + 9426acd commit f9d1202
Show file tree
Hide file tree
Showing 8 changed files with 144 additions and 80 deletions.
6 changes: 1 addition & 5 deletions .credo.exs
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,7 @@
{Credo.Check.Warning.MapGetUnsafePass, false},

# Deprecated checks (these will be deleted after a grace period)
{Credo.Check.Readability.Specs, false},
{Credo.Check.Warning.NameRedeclarationByAssignment, false},
{Credo.Check.Warning.NameRedeclarationByCase, false},
{Credo.Check.Warning.NameRedeclarationByDef, false},
{Credo.Check.Warning.NameRedeclarationByFn, false}
{Credo.Check.Readability.Specs, false}

# Custom checks can be created using `mix credo.gen.check`.
#
Expand Down
3 changes: 2 additions & 1 deletion .formatter.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
inputs: [
"mix.exs",
"lib/**/*.ex",
"test/**/*.exs"
"test/**/*.exs",
"examples/**/*.ex"
],
line_length: 120
]
8 changes: 7 additions & 1 deletion examples/consumer.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
defmodule ExampleConsumer do
@moduledoc """
Example GenRMQ.Consumer implementation
Example GenRMQ.Consumer implementation.
To try it:
```
MIX_ENV=test iex -S mix
iex(1)> ExampleConsumer.start_link()
```
"""
@behaviour GenRMQ.Consumer

Expand Down
7 changes: 7 additions & 0 deletions examples/publisher.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
defmodule ExamplePublisher do
@moduledoc """
Example GenRMQ.Publisher implementation
To try it:
```
MIX_ENV=test iex -S mix
iex(1)> ExamplePublisher.start_link()
iex(2)> ExamplePublisher.publish_message("test", "routing_key")
```
"""

@behaviour GenRMQ.Publisher
Expand Down
77 changes: 47 additions & 30 deletions lib/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -203,20 +203,29 @@ defmodule GenRMQ.Consumer do
##############################################################################

@doc false
@impl GenServer
def init(%{module: module} = initial_state) do
config = apply(module, :init, [])

initial_state
|> Map.merge(%{config: config, reconnect_attempt: 0})
|> rabbitmq_connect
state =
initial_state
|> Map.put(:config, config)
|> Map.put(:reconnect_attempt, 0)
|> get_connection()
|> open_channels()
|> setup_consumer()

{:ok, state}
end

@doc false
@impl GenServer
def handle_call({:recover, requeue}, _from, %{in: channel} = state) do
{:reply, Basic.recover(channel, requeue: requeue), state}
end

@doc false
@impl GenServer
def handle_info({:DOWN, _ref, :process, _pid, reason}, %{module: module, config: config} = state) do
Logger.info("[#{module}]: RabbitMQ connection is down! Reason: #{inspect(reason)}")

Expand All @@ -226,24 +235,28 @@ defmodule GenRMQ.Consumer do
end

@doc false
@impl GenServer
def handle_info({:basic_consume_ok, %{consumer_tag: consumer_tag}}, %{module: module} = state) do
Logger.info("[#{module}]: Broker confirmed consumer with tag #{consumer_tag}")
{:noreply, state}
end

@doc false
@impl GenServer
def handle_info({:basic_cancel, %{consumer_tag: consumer_tag}}, %{module: module} = state) do
Logger.warn("[#{module}]: The consumer was unexpectedly cancelled, tag: #{consumer_tag}")
{:stop, :cancelled, state}
end

@doc false
@impl GenServer
def handle_info({:basic_cancel_ok, %{consumer_tag: consumer_tag}}, %{module: module} = state) do
Logger.info("[#{module}]: Consumer was cancelled, tag: #{consumer_tag}")
{:noreply, state}
end

@doc false
@impl GenServer
def handle_info({:basic_deliver, payload, attributes}, %{module: module, config: config} = state) do
%{delivery_tag: tag, routing_key: routing_key, redelivered: redelivered} = attributes
Logger.debug("[#{module}]: Received message. Tag: #{tag}, routing key: #{routing_key}, redelivered: #{redelivered}")
Expand All @@ -258,12 +271,14 @@ defmodule GenRMQ.Consumer do
end

@doc false
@impl GenServer
def terminate(:connection_closed = reason, %{module: module}) do
# Since connection has been closed no need to clean it up
Logger.debug("[#{module}]: Terminating consumer, reason: #{inspect(reason)}")
end

@doc false
@impl GenServer
def terminate(reason, %{module: module, conn: conn}) do
Logger.debug("[#{module}]: Terminating consumer, reason: #{inspect(reason)}")
AMQP.Connection.close(conn)
Expand Down Expand Up @@ -291,54 +306,45 @@ defmodule GenRMQ.Consumer do
end

defp handle_reconnect(_, state) do
{:ok, new_state} =
new_state =
state
|> Map.put(:reconnect_attempt, 0)
|> rabbitmq_connect()
|> get_connection()
|> open_channels()
|> setup_consumer()

{:noreply, new_state}
end

defp rabbitmq_connect(%{config: config, module: module, reconnect_attempt: attempt} = state) do
rabbit_uri = config[:uri]
retry_delay_fn = config[:retry_delay_function] || (&linear_delay/1)

case Connection.open(rabbit_uri) do
defp get_connection(%{config: config, module: module, reconnect_attempt: attempt} = state) do
case Connection.open(config[:uri]) do
{:ok, conn} ->
Process.monitor(conn.pid)

{:ok, chan} = Channel.open(conn)
{:ok, out_chan} = Channel.open(conn)

queue = setup_rabbit(chan, config)
consumer_tag = apply(module, :consumer_tag, [])

{:ok, _consumer_tag} = Basic.consume(chan, queue, nil, consumer_tag: consumer_tag)
{:ok, %{in: chan, out: out_chan, conn: conn, config: config, module: module}}
Map.put(state, :conn, conn)

{:error, e} ->
Logger.error(
"[#{module}]: Failed to connect to RabbitMQ with settings: " <>
"#{inspect(strip_key(config, :uri))}, reason #{inspect(e)}"
)

retry_delay_fn = config[:retry_delay_function] || (&linear_delay/1)
next_attempt = attempt + 1
retry_delay_fn.(next_attempt)
rabbitmq_connect(%{state | reconnect_attempt: next_attempt})

state
|> Map.put(:reconnect_attempt, next_attempt)
|> get_connection()
end
end

defp linear_delay(attempt) do
:timer.sleep(attempt * 1_000)
defp open_channels(%{conn: conn} = state) do
{:ok, chan} = Channel.open(conn)
{:ok, out_chan} = Channel.open(conn)
Map.merge(state, %{in: chan, out: out_chan})
end

defp strip_key(keyword_list, key) do
keyword_list
|> Keyword.delete(key)
|> Keyword.put(key, "[FILTERED]")
end

defp setup_rabbit(chan, config) do
defp setup_consumer(%{in: chan, config: config, module: module} = state) do
queue = config[:queue]
exchange = config[:exchange]
routing_key = config[:routing_key]
Expand All @@ -352,7 +358,10 @@ defmodule GenRMQ.Consumer do
Queue.declare(chan, queue, durable: true, arguments: arguments)
Exchange.topic(chan, exchange, durable: true)
Queue.bind(chan, queue, exchange, routing_key: routing_key)
queue

consumer_tag = apply(module, :consumer_tag, [])
{:ok, _consumer_tag} = Basic.consume(chan, queue, nil, consumer_tag: consumer_tag)
state
end

defp setup_deadletter(chan, config) do
Expand All @@ -375,6 +384,14 @@ defmodule GenRMQ.Consumer do
end
end

defp strip_key(keyword_list, key) do
keyword_list
|> Keyword.delete(key)
|> Keyword.put(key, "[FILTERED]")
end

defp linear_delay(attempt), do: :timer.sleep(attempt * 1_000)

defp setup_ttl(arguments, nil), do: arguments
defp setup_ttl(arguments, ttl), do: [{"x-expires", :long, ttl} | arguments]

Expand Down
3 changes: 3 additions & 0 deletions lib/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ defmodule GenRMQ.Publisher do
##############################################################################

@doc false
@impl GenServer
def init(%{module: module} = initial_state) do
config = apply(module, :init, [])

Expand All @@ -125,13 +126,15 @@ defmodule GenRMQ.Publisher do
end

@doc false
@impl GenServer
def handle_call({:publish, msg, key, metadata}, _from, %{channel: channel, config: config} = state) do
metadata = config |> base_metadata() |> merge_metadata(metadata)
result = Basic.publish(channel, config[:exchange], key, msg, metadata)
{:reply, result, state}
end

@doc false
@impl GenServer
def handle_info({:DOWN, _ref, :process, _pid, reason}, %{module: module, config: config}) do
Logger.info("[#{module}]: RabbitMQ connection is down! Reason: #{inspect(reason)}")
{:ok, state} = setup_publisher(%{module: module, config: config})
Expand Down

0 comments on commit f9d1202

Please sign in to comment.