Skip to content

Commit

Permalink
Add documentation to Basic.consume
Browse files Browse the repository at this point in the history
Add documentation to Connection.open
Minor documentation improvements
  • Loading branch information
pma committed Jan 29, 2015
1 parent caff663 commit ce8a527
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 130 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ iex> AMQP.Basic.publish chan, "test_exchange", "", "Hello, World!"
Hello, World!
```

### Slightly more advanced usage: Setup a consumer GenServer
### Setup a consumer GenServer

```elixir
defmodule Consumer do
Expand Down Expand Up @@ -84,17 +84,17 @@ defmodule Consumer do
end

# Confirmation sent by the broker after registering this process as a consumer
def handle_info({:basic_consume_ok, consumer_tag}, chan) do
def handle_info({:basic_consume_ok, %{consumer_tag: consumer_tag}}, chan) do
{:noreply, chan}
end

# Sent by the broker when the consumer is unexpectedly cancelled (such as after a queue deletion)
def handle_info({:basic_cancel, consumer_tag}, chan) do
def handle_info({:basic_cancel, %{consumer_tag: consumer_tag}}, chan) do
{:stop, :normal, chan}
end

# Confirmation sent by the broker to the consumer process after a Basic.cancel
def handle_info({:basic_cancel_ok, consumer_tag}, chan) do
def handle_info({:basic_cancel_ok, %{consumer_tag: consumer_tag}}, chan) do
{:noreply, chan}
end

Expand Down
2 changes: 2 additions & 0 deletions lib/amqp.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
defmodule AMQP do
@moduledoc false

defmacro __using__(_opts) do
quote do
alias AMQP.Connection
Expand Down
139 changes: 76 additions & 63 deletions lib/amqp/basic.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
defmodule AMQP.Basic do
require Logger

@moduledoc """
Functions to publish, consume and acknowledge messages.
"""
Expand All @@ -21,29 +19,34 @@ defmodule AMQP.Basic do
empty string, it publishes to the default exchange.
The `routing_key` parameter specifies the routing key for the message.
The `payload` parameter specifies the message content as a binary..
In addition to the previous parameters, , the following options can be used:
* `mandatory`: If set, returns an error if the broker can't route the message to a queue
* `immediate`: If set, returns an error if the broker can't deliver te message to a consumer immediately.
Additional Basic properties can be set using the following options:
* `content_type`: MIME Content type
* `content_encoding`: MIME Content encoding
* `headers`: Message headers. Can be used with headers Exchanges
* `persistent`: If set, uses persistent delivery mode. Messages marked as `persistent` that are
delivered to `durable` queues will be logged to disk
* `correlation_id`: application correlation identifier
* `priority`: message priority, ranging from 0 to 9
* `reply_to`: name of the reply queue
* `expiration`: how long the message is valid (in milliseconds)
* `message_id`: message identifier
* `timestamp`: timestamp associated with this message (epoch time)
* `type`: message type as a string
* `user_id`: creating user ID. RabbitMQ will validate this against the active connection user
* `app_id`: publishing application ID
The `payload` parameter specifies the message content as a binary.
In addition to the previous parameters, the following options can be used:
# Options
* `:mandatory` - If set, returns an error if the broker can't route the message to a queue (default `false`);
* `:immediate` - If set, returns an error if the broker can't deliver te message to a consumer immediately (default `false`);
* `:content_type` - MIME Content type;
* `:content_encoding` - MIME Content encoding;
* `:headers` - Message headers. Can be used with headers Exchanges;
* `:persistent` - If set, uses persistent delivery mode. Messages marked as `persistent` that are delivered to `durable` \
queues will be logged to disk;
* `:correlation_id` - application correlation identifier;
* `:priority` - message priority, ranging from 0 to 9;
* `:reply_to` - name of the reply queue;
* `:expiration` - how long the message is valid (in milliseconds);
* `:message_id` - message identifier;
* `:timestamp` - timestamp associated with this message (epoch time);
* `:type` - message type as a string;
* `:user_id` - creating user ID. RabbitMQ will validate this against the active connection user;
* `:app_id` - publishing application ID.
## Examples
iex> AMQP.Basic.publish chan, \"my_exchange\", \"my_routing_key\", \"Hello World!\", persistent: true
:ok
"""
def publish(%Channel{pid: pid}, exchange, routing_key, payload, options \\ []) do
basic_publish =
Expand Down Expand Up @@ -186,12 +189,22 @@ delivered to `durable` queues will be logged to disk

@doc """
Registers a queue consumer process. The `pid` of the process can be set using
the `handler` option and defaults to the calling process.
the `consumer_pid` argument and defaults to the calling process.
The consumer process will receive the following data structures:
* `{:basic_deliver, payload, meta}` - This is sent for each message consumed, where \
`payload` contains the message content and `meta` contains all the metadata set when \
sending with Basic.publish or additional info set by the broker;
* `{:basic_consume_ok, %{consumer_tag: consumer_tag}}` - Sent when the consumer \
process is registered with Basic.consume. The caller receives the same information \
as the return of Basic.consume;
* `{:basic_cancel, %{consumer_tag: consumer_tag, no_wait: no_wait}}` - Sent by the \
broker when the consumer is unexpectedly cancelled (such as after a queue deletion)
* `{:basic_cancel_ok, %{consumer_tag: consumer_tag}}` - Sent to the consumer process after a call to Basic.cancel
The handler process will receive the following data structures and should as
process messages.
"""
def consume(%Channel{} = chan, queue, consumer \\ nil, options \\ []) do
def consume(%Channel{} = chan, queue, consumer_pid \\ nil, options \\ []) do
basic_consume =
basic_consume(queue: queue,
consumer_tag: Keyword.get(options, :consumer_tag, ""),
Expand All @@ -201,30 +214,30 @@ delivered to `durable` queues will be logged to disk
nowait: Keyword.get(options, :no_wait, false),
arguments: Keyword.get(options, :arguments, []))

consumer = consumer || self()
consumer_pid = consumer_pid || self()

response_mapper = spawn fn ->
adapter_pid = spawn fn ->
Process.flag(:trap_exit, true)
Process.monitor(consumer)
Process.monitor(consumer_pid)
Process.monitor(chan.pid)
do_start_consumer(chan, consumer)
do_start_consumer(chan, consumer_pid)
end

basic_consume_ok(consumer_tag: consumer_tag) =
:amqp_channel.subscribe(chan.pid, basic_consume, response_mapper)
:amqp_channel.subscribe(chan.pid, basic_consume, adapter_pid)

{:ok, consumer_tag}
end

defp do_start_consumer(chan, consumer) do
defp do_start_consumer(chan, consumer_pid) do
receive do
basic_consume_ok(consumer_tag: consumer_tag) ->
send consumer, {:basic_consume_ok, consumer_tag}
do_consume(chan, consumer, consumer_tag)
send consumer_pid, {:basic_consume_ok, %{consumer_tag: consumer_tag}}
do_consume(chan, consumer_pid, consumer_tag)
end
end

defp do_consume(chan, consumer, consumer_tag) do
defp do_consume(chan, consumer_pid, consumer_tag) do
receive do
{basic_deliver(consumer_tag: consumer_tag,
delivery_tag: delivery_tag,
Expand All @@ -245,34 +258,34 @@ delivered to `durable` queues will be logged to disk
user_id: user_id,
app_id: app_id,
cluster_id: cluster_id), payload: payload)} ->
send consumer, {:basic_deliver, payload, %{consumer_tag: consumer_tag,
delivery_tag: delivery_tag,
redelivered: redelivered,
exchange: exchange,
routing_key: routing_key,
content_type: content_type,
content_encoding: content_encoding,
headers: headers,
persistent: delivery_mode == 2,
priority: priority,
correlation_id: correlation_id,
reply_to: reply_to,
expiration: expiration,
message_id: message_id,
timestamp: timestamp,
type: type,
user_id: user_id,
app_id: app_id,
cluster_id: cluster_id}}
do_consume(chan, consumer, consumer_tag)
send consumer_pid, {:basic_deliver, payload, %{consumer_tag: consumer_tag,
delivery_tag: delivery_tag,
redelivered: redelivered,
exchange: exchange,
routing_key: routing_key,
content_type: content_type,
content_encoding: content_encoding,
headers: headers,
persistent: delivery_mode == 2,
priority: priority,
correlation_id: correlation_id,
reply_to: reply_to,
expiration: expiration,
message_id: message_id,
timestamp: timestamp,
type: type,
user_id: user_id,
app_id: app_id,
cluster_id: cluster_id}}
do_consume(chan, consumer_pid, consumer_tag)
basic_consume_ok(consumer_tag: consumer_tag) ->
send consumer, {:basic_consume_ok, consumer_tag}
do_consume(chan, consumer, consumer_tag)
send consumer_pid, {:basic_consume_ok, %{consumer_tag: consumer_tag}}
do_consume(chan, consumer_pid, consumer_tag)
basic_cancel_ok(consumer_tag: consumer_tag) ->
send consumer, {:basic_cancel_ok, consumer_tag}
send consumer_pid, {:basic_cancel_ok, %{consumer_tag: consumer_tag}}
basic_cancel(consumer_tag: consumer_tag, nowait: no_wait) ->
send consumer, {:basic_cancel, %{consumer_tag: consumer_tag, no_wait: no_wait}}
{:DOWN, _ref, :process, ^consumer, reason} ->
send consumer_pid, {:basic_cancel, %{consumer_tag: consumer_tag, no_wait: no_wait}}
{:DOWN, _ref, :process, ^consumer_pid, reason} ->
cancel(chan, consumer_tag)
exit(reason)
{:DOWN, _ref, :process, _pid, reason} ->
Expand Down
9 changes: 5 additions & 4 deletions lib/amqp/channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ defmodule AMQP.Channel do
Functions to operate on Channels.
"""

alias __MODULE__
alias AMQP.Connection
alias AMQP.Channel

defstruct [:conn, :pid]

Expand All @@ -21,6 +21,7 @@ defmodule AMQP.Channel do
@doc """
Closes an open Channel.
"""
def close(%Channel{pid: pid}), do: :amqp_channel.close(pid)

end
def close(%Channel{pid: pid}) do
:amqp_channel.close(pid)
end
end
1 change: 1 addition & 0 deletions lib/amqp/confirm.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule AMQP.Confirm do
"""

import AMQP.Core

alias AMQP.Channel

@doc """
Expand Down
Loading

0 comments on commit ce8a527

Please sign in to comment.