Plug.AMQP
provides an AMQP interface to Plug
. When
using Plug.AMQP
we can write servers that answer requests sent through an
AMQP broker, like RabbitMQ. The request response
pattern is explained in detail
here.
We can use plug_amqp
in our projects by adding the dependency:
def deps do
[
{:plug_amqp, "~> 0.5"},
]
end
To use Plug.AMQP
, add it to the supervision tree:
children = [
{Plug.AMQP, connection_options: "amqp://my-rabbit:5672", plug: MyPlug}
]
Supervisor.start_link(children, strategy: :one_for_one)
Here it is a Plug
as an example:
defmodule MyPlug do
@behaviour Plug
@impl true
def init(_opts), do: nil
@impl true
def call(conn, _opts) do
{:ok, body, conn} = Plug.Conn.read_body(conn)
input = String.to_integer(body)
output = fib(input)
resp_body = to_string(output)
Plug.Conn.send_resp(conn, 200, resp_body)
end
defp fib(0), do: 0
defp fib(1), do: 1
defp fib(n) when n > 1, do: fib(n - 1) + fib(n - 2)
end
Then we can start sending messages to the server through AMQP:
# Setting Up the RPC Request Queue
{:ok, conn} = AMQP.Connection.open()
{:ok, chan} = AMQP.Channel.open(conn)
{:ok, _info} = AMQP.Queue.declare(chan, "rpc_queue")
# Starting the Server
{:ok, _adapter} = Plug.AMQP.start_link(consumer_queue: "rpc_queue", plug: MyPlug)
# Setting the Client
{:ok, %{queue: callback_queue}} = AMQP.Queue.declare(chan, "", exclusive: true)
{:ok, _ctag} = AMQP.Basic.consume(chan, callback_queue, nil, no_ack: true)
# Sending a Request
IO.puts("Sending a 30 as a request")
AMQP.Basic.publish(chan, "", "rpc_queue", "30", reply_to: callback_queue)
# Waiting a Response
receive do
{:basic_deliver, response, _meta} ->
IO.puts("Got a #{response} response")
end
Check Plug.AMQP
module from the
online documentation for more
information. Also, take a look the examples in the examples
folder.