Skip to content
This repository has been archived by the owner on Nov 27, 2023. It is now read-only.

Commit

Permalink
Merge 9dae0fe into a07e25b
Browse files Browse the repository at this point in the history
  • Loading branch information
mkorszun committed Apr 6, 2020
2 parents a07e25b + 9dae0fe commit e1b63df
Show file tree
Hide file tree
Showing 14 changed files with 359 additions and 72 deletions.
87 changes: 17 additions & 70 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ If you were a very early adopter of gen_rmq (before `v1.0.0`), please check [how

## Examples

More thorough examples for using `GenRMQ.Consumer`, `GenRMQ.Publisher`, and `GenRMQ.Processor` can be found in the [examples][examples] directory.
More thorough examples for using `GenRMQ.Consumer`, `GenRMQ.Publisher`, and `GenRMQ.Processor`
can be found under [documentation](./documentation).

### Consumer

Expand Down Expand Up @@ -99,80 +100,27 @@ GenRMQ.Publisher.start_link(Publisher, name: Publisher)
GenRMQ.Publisher.publish(Publisher, Jason.encode!(%{msg: "msg"}))
```

## Telemetry
## Documentation

GenRMQ emits [Telemetry][telemetry] events for both consumers and publishers.
It currently exposes the following events:
### Examples

- `[:gen_rmq, :publisher, :connection, :start]` - Dispatched by a GenRMQ publisher when a connection to RabbitMQ is started
1. [Consumer](./documentation/examples/consumer.ex)
2. [Publisher](./documentation/examples/publisher.ex)
3. [Processor](./documentation/examples/processor.ex)

- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{exchange: String.t}`
### Guides

* `[:gen_rmq, :publisher, :connection, :stop]` - Dispatched by a GenRMQ publisher when a connection to RabbitMQ has been established
1. [Basic consumer setup](./documentation/guides/consumer/basic_setup.md)
2. [Consumer with custom deadletter configuration](./documentation/guides/consumer/with_custom_deadletter_configuration.md)
3. [Consumer with custom exchange type](./documentation/guides/consumer/with_custom_exchange_type.md)
4. [Consumer with custom queue configuration](./documentation/guides/consumer/with_custom_queue_configuration.md)
5. [Consumer without deadletter configuration](./documentation/guides/consumer/without_deadletter_configuration.md)
6. [Consumer with quorum queues](./documentation/guides/consumer/with_quorum_queue_type.md)

- Measurement: `%{time: System.monotonic_time, duration: native_time}`
- Metadata: `%{exchange: String.t}`
### Metrics

* `[:gen_rmq, :publisher, :connection, :down]` - Dispatched by a GenRMQ publisher when a connection to RabbitMQ has been lost

- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{module: atom, reason: atom}`

* `[:gen_rmq, :publisher, :message, :start]` - Dispatched by a GenRMQ publisher when a message is about to be published to RabbitMQ

- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{exchange: String.t, message: String.t}`

* `[:gen_rmq, :publisher, :message, :stop]` - Dispatched by a GenRMQ publisher when a message has been published to RabbitMQ

- Measurement: `%{time: System.monotonic_time, duration: native_time}`
- Metadata: `%{exchange: String.t, message: String.t}`

* `[:gen_rmq, :publisher, :message, :error]` - Dispatched by a GenRMQ publisher when a message failed to be published to RabbitMQ

- Measurement: `%{time: System.monotonic_time, duration: native_time}`
- Metadata: `%{exchange: String.t, message: String.t, kind: atom, reason: atom}`

* `[:gen_rmq, :consumer, :message, :ack]` - Dispatched by a GenRMQ consumer when a message has been acknowledged

- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{message: String.t}`

* `[:gen_rmq, :consumer, :message, :reject]` - Dispatched by a GenRMQ consumer when a message has been rejected

- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{message: String.t, requeue: boolean}`

* `[:gen_rmq, :consumer, :message, :start]` - Dispatched by a GenRMQ consumer when the processing of a message has begun

- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{message: String.t, module: atom}`

* `[:gen_rmq, :consumer, :message, :stop]` - Dispatched by a GenRMQ consumer when the processing of a message has completed

- Measurement: `%{time: System.monotonic_time, duration: native_time}`
- Metadata: `%{message: String.t, module: atom}`

* `[:gen_rmq, :consumer, :connection, :start]` - Dispatched by a GenRMQ consumer when a connection to RabbitMQ is started

- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{module: atom, attempt: integer, queue: String.t, exchange: String.t, routing_key: String.t}`

* `[:gen_rmq, :consumer, :connection, :stop]` - Dispatched by a GenRMQ consumer when a connection to RabbitMQ has been established

- Measurement: `%{time: System.monotonic_time, duration: native_time}`
- Metadata: `%{module: atom, attempt: integer, queue: String.t, exchange: String.t, routing_key: String.t}`

* `[:gen_rmq, :consumer, :connection, :error]` - Dispatched by a GenRMQ consumer when a connection to RabbitMQ could not be made

- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{module: atom, attempt: integer, queue: String.t, exchange: String.t, routing_key: String.t, error: any}`

* `[:gen_rmq, :consumer, :connection, :down]` - Dispatched by a GenRMQ consumer when a connection to RabbitMQ has been lost

- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{module: atom, reason: atom}`
1. [Consumer Telemetry events](./documentation/guides/consumer/telemetry_events.md)
2. [Publisher Telemetry events](./documentation/guides/publisher/telemetry_events.md)

## Running tests

Expand Down Expand Up @@ -219,7 +167,6 @@ Copyright (c) 2018 - 2020 Meltwater Inc. [underthehood.meltwater.com][undertheho
[amqp]: https://github.com/pma/amqp
[rabbit_case_example]: https://github.com/meltwater/gen_rmq/blob/master/test/gen_rmq_publisher_test.exs
[migrating_to_100]: https://github.com/meltwater/gen_rmq/wiki/Migrations#0---100
[examples]: https://github.com/meltwater/gen_rmq/tree/master/examples
[consumer_doc]: https://github.com/meltwater/gen_rmq/blob/master/lib/consumer.ex
[telemetry]: https://github.com/beam-telemetry/telemetry
[docker_compose]: https://docs.docker.com/compose/
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
37 changes: 37 additions & 0 deletions documentation/guides/consumer/basic_setup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
Consumer basic setup
====================

# Example

~~~elixir
defmodule ConsumerBasicSetup do
@behaviour GenRMQ.Consumer

def init() do
[
connection: "amqp://guest:guest@localhost:5672",
queue: "example_queue",
exchange: "example_exchange",
routing_key: "routing_key.#",
prefetch_count: "10"
]
end

def handle_message(%GenRMQ.Message{} = message), do: GenRMQ.Consumer.ack(message)

def consumer_tag(), do: "consumer-tag"

def start_link(), do: GenRMQ.Consumer.start_link(__MODULE__, name: __MODULE__)
end

~~~

# Outcome:

- durable `example_exchange.deadletter` exchange created or redeclared
- durable `example_queue_error` queue created or redeclared and bound to `example_exchange.deadletter` exchange
- durable topic `example_exchange` exchange created or redeclared
- durable `example_queue` queue created or redeclared and bound to `example_exchange` exchange
- queue `example_queue` has a deadletter exchange set to `example_exchange.deadletter`
- every `handle_message` callback will be executed in a separate process
- on failed rabbitmq connection it will wait for a bit and then reconnect
43 changes: 43 additions & 0 deletions documentation/guides/consumer/telemetry_events.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Consumer Telemetry events

GenRMQ emits [Telemetry][telemetry] events for consumers. It currently exposes the following events:

- `[:gen_rmq, :consumer, :message, :ack]` - Dispatched by a GenRMQ consumer when a message has been acknowledged

- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{message: String.t}`

- `[:gen_rmq, :consumer, :message, :reject]` - Dispatched by a GenRMQ consumer when a message has been rejected

- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{message: String.t, requeue: boolean}`

- `[:gen_rmq, :consumer, :message, :start]` - Dispatched by a GenRMQ consumer when the processing of a message has begun

- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{message: String.t, module: atom}`

- `[:gen_rmq, :consumer, :message, :stop]` - Dispatched by a GenRMQ consumer when the processing of a message has completed

- Measurement: `%{time: System.monotonic_time, duration: native_time}`
- Metadata: `%{message: String.t, module: atom}`

- `[:gen_rmq, :consumer, :connection, :start]` - Dispatched by a GenRMQ consumer when a connection to RabbitMQ is started

- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{module: atom, attempt: integer, queue: String.t, exchange: String.t, routing_key: String.t}`

- `[:gen_rmq, :consumer, :connection, :stop]` - Dispatched by a GenRMQ consumer when a connection to RabbitMQ has been established

- Measurement: `%{time: System.monotonic_time, duration: native_time}`
- Metadata: `%{module: atom, attempt: integer, queue: String.t, exchange: String.t, routing_key: String.t}`

- `[:gen_rmq, :consumer, :connection, :error]` - Dispatched by a GenRMQ consumer when a connection to RabbitMQ could not be made

- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{module: atom, attempt: integer, queue: String.t, exchange: String.t, routing_key: String.t, error: any}`

- `[:gen_rmq, :consumer, :connection, :down]` - Dispatched by a GenRMQ consumer when a connection to RabbitMQ has been lost

- Measurement: `%{time: System.monotonic_time}`
- Metadata: `%{module: atom, reason: atom}`
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
Consumer with custom deadletter configuration
=============================================

Consumer with custom deadletter:

- queue name
- exchange name
- routing key
- queue arguments and type (transient instead of durable)

`deadletter_queue_options` - Queue options for the deadletter queue as declared in
[AMQP.Queue.declare/3](https://hexdocs.pm/amqp/AMQP.Queue.html#declare/3).

# Example

~~~elixir
defmodule ConsumerWithCustomDeadletterConfiguration do
@behaviour GenRMQ.Consumer

def init() do
[
connection: "amqp://guest:guest@localhost:5672",
queue: "example_queue",
exchange: "example_exchange",
routing_key: "routing_key.#",
prefetch_count: "10",
deadletter_queue: "custom_deadletter_queue",
deadletter_exchange: "custom_deadletter_exchange",
deadletter_routing_key: "custom_deadletter_routing_key",
deadletter_queue_options: [
durable: false,
arguments: [
{"x-expires", :long, 3_600_000},
{"x-max-priority", :long, 3}
]
]
]
end

def handle_message(%GenRMQ.Message{} = message), do: GenRMQ.Consumer.ack(message)

def consumer_tag(), do: "consumer-tag"

def start_link(), do: GenRMQ.Consumer.start_link(__MODULE__, name: __MODULE__)
end
~~~

# Outcome:

- durable `custom_deadletter_exchange` exchange created or redeclared
- transient, priority and with ttl `custom_deadletter_queue` queue created or redeclared and bound to `custom_deadletter_exchange` exchange
- durable topic `example_exchange` exchange created or redeclared
- durable `example_queue` queue created or redeclared and bound to `example_exchange` exchange
- queue `example_queue` has a deadletter exchange set to `custom_deadletter_exchange`
- every `handle_message` callback will be executed in a separate process
- on failed rabbitmq connection it will wait for a bit and then reconnect
39 changes: 39 additions & 0 deletions documentation/guides/consumer/with_custom_exchange_type.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
Consumer with custom exchange
=============================

By default consumer creates `topic` exchanges. This can be modified by
specyfying exchange type: `exchange: {:fanout, "custom_exchange"}`.

# Example

~~~elixir
defmodule WithCustomExchangeType do
@behaviour GenRMQ.Consumer

def init() do
[
connection: "amqp://guest:guest@localhost:5672",
queue: "example_queue",
exchange: {:fanout, "custom_exchange"},
routing_key: "routing_key.#",
prefetch_count: "10"
]
end

def handle_message(%GenRMQ.Message{} = message), do: GenRMQ.Consumer.ack(message)

def consumer_tag(), do: "consumer-tag"

def start_link(), do: GenRMQ.Consumer.start_link(__MODULE__, name: __MODULE__)
end
~~~

# Outcome:

- durable `example_exchange.deadletter` exchange created or redeclared
- durable `example_queue_error` queue created or redeclared and bound to `example_exchange.deadletter` exchange
- durable fanout `custom_exchange` exchange created or redeclared
- durable `example_queue` queue created or redeclared and bound to `custom_exchange` exchange
- queue `example_queue` has a deadletter exchange set to `example_exchange.deadletter`
- every `handle_message` callback will be executed in a separate process
- on failed rabbitmq connection it will wait for a bit and then reconnect
46 changes: 46 additions & 0 deletions documentation/guides/consumer/with_custom_queue_configuration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
Consumer with custom queue options
==================================

`queue_options` - Queue options as declared in
[AMQP.Queue.declare/3](https://hexdocs.pm/amqp/AMQP.Queue.html#declare/3).

# Example

~~~elixir
defmodule ConsumerWithCustomQueueConfiguration do
@behaviour GenRMQ.Consumer

def init() do
[
connection: "amqp://guest:guest@localhost:5672",
queue: "example_queue",
exchange: "example_exchange",
routing_key: "routing_key.#",
prefetch_count: "10",
queue_options: [
durable: false,
arguments: [
{"x-expires", :long, 3_600_000},
{"x-max-priority", :long, 3}
]
]
]
end

def handle_message(%GenRMQ.Message{} = message), do: GenRMQ.Consumer.ack(message)

def consumer_tag(), do: "consumer-tag"

def start_link(), do: GenRMQ.Consumer.start_link(__MODULE__, name: __MODULE__)
end
~~~

# Outcome:

- durable `example_exchange.deadletter` exchange created or redeclared
- durable `example_queue_error` queue created or redeclared and bound to `example_exchange.deadletter` exchange
- durable topic `example_exchange` exchange created or redeclared
- transient, priority and with ttl `example_queue` queue created or redeclared and bound to `example_exchange` exchange
- queue `example_queue` has a deadletter exchange set to `example_exchange.deadletter`
- every `handle_message` callback will be executed in a separate process
- on failed rabbitmq connection it will wait for a bit and then reconnect
49 changes: 49 additions & 0 deletions documentation/guides/consumer/with_quorum_queue_type.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
Consumer with quorum queues
===========================

You can change from `classic` to a `quorum` queue type by specyfing corresponding
queue arguments. Read more about quorum queues [here](https://www.rabbitmq.com/quorum-queues.html).

# Example

~~~elixir
defmodule WithQuorumQueueType do
@behaviour GenRMQ.Consumer

def init() do
[
connection: "amqp://guest:guest@localhost:5672",
queue: "example_queue",
exchange: "example_exchange",
routing_key: "routing_key.#",
prefetch_count: "10",
queue_options: [
arguments: [
{"x-queue-type", :longstr, "quorum"}
]
],
deadletter_queue_options: [
arguments: [
{"x-queue-type", :longstr, "quorum"}
]
]
]
end

def handle_message(%GenRMQ.Message{} = message), do: GenRMQ.Consumer.ack(message)

def consumer_tag(), do: "consumer-tag"

def start_link(), do: GenRMQ.Consumer.start_link(__MODULE__, name: __MODULE__)
end
~~~

# Outcome:

- durable `example_exchange.deadletter` exchange created or redeclared
- durable `example_queue_error` **quorum** queue created or redeclared and bound to `example_exchange.deadletter` exchange
- durable topic `example_exchange` exchange created or redeclared
- durable `example_queue` **quorum** queue created or redeclared and bound to `example_exchange` exchange
- queue `example_queue` has a deadletter exchange set to `example_exchange.deadletter`
- every `handle_message` callback will be executed in a separate process
- on failed rabbitmq connection it will wait for a bit and then reconnect

0 comments on commit e1b63df

Please sign in to comment.