Skip to content
This repository has been archived by the owner on Nov 27, 2023. It is now read-only.

Commit

Permalink
Merge 538eedf into 7d88f8b
Browse files Browse the repository at this point in the history
  • Loading branch information
akoutmos committed Jun 1, 2020
2 parents 7d88f8b + 538eedf commit c82bb1e
Show file tree
Hide file tree
Showing 10 changed files with 297 additions and 272 deletions.
9 changes: 2 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ GenRMQ provides the following functionality:
- `GenRMQ.Consumer` - a behaviour for implementing RabbitMQ consumers ([example][example_consumer])
- `GenRMQ.Publisher` - a behaviour for implementing RabbitMQ publishers ([example][example_publisher])
- `GenRMQ.Processor` - a behaviour for implementing RabbitMQ message processors (this is useful to separate out business logic from your consumer) ([example][example_processor])
- `GenRMQ.Consumer.Telemetry` - telemetry events emitted by a GenRMQ consumer
- `GenRMQ.Publisher.Telemetry` - telemetry events emitted by a GenRMQ publisher
- `GenRMQ.RabbitCase` - test utilities for RabbitMQ ([example][example_rabbit_case])

## Installation
Expand Down Expand Up @@ -117,11 +119,6 @@ GenRMQ.Publisher.publish(Publisher, Jason.encode!(%{msg: "msg"}))
- [Consumer without deadletter configuration][without_deadletter_configuration]
- [Consumer with quorum queues][with_quorum_queue_type]

### Metrics

- [Consumer Telemetry events][consumer_telemetry_events]
- [Publisher Telemetry events][publisher_telemetry_events]

## Running Tests

You need [docker-compose][docker_compose] installed.
Expand Down Expand Up @@ -181,8 +178,6 @@ Copyright (c) 2018 - 2020 Meltwater Inc. [underthehood.meltwater.com][undertheho
[guide_consumer_with_custom_queue_configuration]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/consumer/with_custom_queue_configuration.md
[without_deadletter_configuration]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/consumer/without_deadletter_configuration.md
[with_quorum_queue_type]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/consumer/with_quorum_queue_type.md
[consumer_telemetry_events]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/consumer/telemetry_events.md
[publisher_telemetry_events]: https://github.com/meltwater/gen_rmq/blob/master/documentation/guides/publisher/telemetry_events.md
[trusted_commiters]: https://github.com/meltwater/gen_rmq/blob/master/TRUSTED-COMMITTERS.md
[code_owners]: https://github.com/meltwater/gen_rmq/blob/master/.github/CODEOWNERS
[license]: https://github.com/meltwater/gen_rmq/blob/master/LICENSE
50 changes: 0 additions & 50 deletions documentation/guides/consumer/telemetry_events.md

This file was deleted.

35 changes: 0 additions & 35 deletions documentation/guides/publisher/telemetry_events.md

This file was deleted.

126 changes: 20 additions & 106 deletions lib/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,14 @@ defmodule GenRMQ.Consumer do
use AMQP

require Logger
alias GenRMQ.{Message, MessageTask}
alias GenRMQ.Consumer.QueueConfiguration

alias GenRMQ.Consumer.{
MessageTask,
QueueConfiguration,
Telemetry
}

alias GenRMQ.Message

##############################################################################
# GenRMQ.Consumer callbacks
Expand Down Expand Up @@ -278,7 +284,7 @@ defmodule GenRMQ.Consumer do
"""
@spec ack(message :: %GenRMQ.Message{}) :: :ok
def ack(%Message{state: %{in: channel}, attributes: %{delivery_tag: tag}} = message) do
emit_message_ack_event(message)
Telemetry.emit_message_ack_event(message)

Basic.ack(channel, tag)
end
Expand All @@ -292,7 +298,7 @@ defmodule GenRMQ.Consumer do
"""
@spec reject(message :: %GenRMQ.Message{}, requeue :: boolean) :: :ok
def reject(%Message{state: %{in: channel}, attributes: %{delivery_tag: tag}} = message, requeue \\ false) do
emit_message_reject_event(message, requeue)
Telemetry.emit_message_reject_event(message, requeue)

Basic.reject(channel, tag, requeue: requeue)
end
Expand Down Expand Up @@ -353,15 +359,15 @@ defmodule GenRMQ.Consumer do
# Cancel timeout timer, emit telemetry event, and invoke user's `handle_error` callback
Process.cancel_timer(timeout_reference)
updated_state = %{state | running_tasks: Map.delete(running_tasks, ref)}
emit_message_error_event(module, reason, message, start_time)
Telemetry.emit_message_error_event(module, reason, message, start_time)
apply(module, :handle_error, [message, reason])

{:noreply, updated_state}

_ ->
Logger.info("[#{module}]: RabbitMQ connection is down! Reason: #{inspect(reason)}")

emit_connection_down_event(module, reason)
Telemetry.emit_connection_down_event(module, reason)

config
|> Keyword.get(:reconnect, true)
Expand Down Expand Up @@ -516,9 +522,9 @@ defmodule GenRMQ.Consumer do
fn ->
start_time = System.monotonic_time()

emit_message_start_event(start_time, message, module)
Telemetry.emit_message_start_event(message, module)
result = apply(module, :handle_message, [message])
emit_message_stop_event(start_time, message, module)
Telemetry.emit_message_stop_event(start_time, message, module)

result
end,
Expand All @@ -528,17 +534,17 @@ defmodule GenRMQ.Consumer do

defp handle_message(message, %{module: module}) do
start_time = System.monotonic_time()
emit_message_start_event(start_time, message, module)
Telemetry.emit_message_start_event(message, module)

try do
result = apply(module, :handle_message, [message])
emit_message_stop_event(start_time, message, module)
Telemetry.emit_message_stop_event(start_time, message, module)

result
rescue
reason ->
full_error = {reason, __STACKTRACE__}
emit_message_error_event(module, full_error, message, start_time)
Telemetry.emit_message_error_event(module, full_error, message, start_time)
apply(module, :handle_error, [message, full_error])
:error
end
Expand Down Expand Up @@ -566,11 +572,11 @@ defmodule GenRMQ.Consumer do
exchange = config[:exchange]
routing_key = config[:routing_key]

emit_connection_start_event(start_time, module, attempt, queue, exchange, routing_key)
Telemetry.emit_connection_start_event(module, attempt, queue, exchange, routing_key)

case Connection.open(config[:connection]) do
{:ok, conn} ->
emit_connection_stop_event(start_time, module, attempt, queue, exchange, routing_key)
Telemetry.emit_connection_stop_event(start_time, module, attempt, queue, exchange, routing_key)
Process.monitor(conn.pid)
Map.put(state, :conn, conn)

Expand All @@ -580,7 +586,7 @@ defmodule GenRMQ.Consumer do
"#{inspect(strip_key(config, :connection))}, reason #{inspect(e)}"
)

emit_connection_error_event(start_time, module, attempt, queue, exchange, routing_key, e)
Telemetry.emit_connection_error_event(start_time, module, attempt, queue, exchange, routing_key, e)

retry_delay_fn = config[:retry_delay_function] || (&linear_delay/1)
next_attempt = attempt + 1
Expand Down Expand Up @@ -632,98 +638,6 @@ defmodule GenRMQ.Consumer do
GenRMQ.Binding.bind_exchange_and_queue(chan, exchange, name, routing_key)
end

defp emit_message_ack_event(message) do
start_time = System.monotonic_time()
measurements = %{time: start_time}
metadata = %{message: message}

:telemetry.execute([:gen_rmq, :consumer, :message, :ack], measurements, metadata)
end

defp emit_message_reject_event(message, requeue) do
start_time = System.monotonic_time()
measurements = %{time: start_time}
metadata = %{message: message, requeue: requeue}

:telemetry.execute([:gen_rmq, :consumer, :message, :reject], measurements, metadata)
end

defp emit_message_start_event(start_time, message, module) do
measurements = %{time: start_time}
metadata = %{message: message, module: module}

:telemetry.execute([:gen_rmq, :consumer, :message, :start], measurements, metadata)
end

defp emit_message_stop_event(start_time, message, module) do
stop_time = System.monotonic_time()
measurements = %{time: stop_time, duration: stop_time - start_time}
metadata = %{message: message, module: module}

:telemetry.execute([:gen_rmq, :consumer, :message, :stop], measurements, metadata)
end

defp emit_message_error_event(module, reason, message, start_time) do
stop_time = System.monotonic_time()
measurements = %{time: stop_time, duration: stop_time - start_time}
metadata = %{module: module, reason: reason, message: message}

:telemetry.execute([:gen_rmq, :consumer, :message, :error], measurements, metadata)
end

defp emit_connection_down_event(module, reason) do
start_time = System.monotonic_time()
measurements = %{time: start_time}
metadata = %{module: module, reason: reason}

:telemetry.execute([:gen_rmq, :consumer, :connection, :down], measurements, metadata)
end

defp emit_connection_start_event(start_time, module, attempt, queue, exchange, routing_key) do
measurements = %{time: start_time}

metadata = %{
module: module,
attempt: attempt,
queue: queue,
exchange: exchange,
routing_key: routing_key
}

:telemetry.execute([:gen_rmq, :consumer, :connection, :start], measurements, metadata)
end

defp emit_connection_stop_event(start_time, module, attempt, queue, exchange, routing_key) do
stop_time = System.monotonic_time()
measurements = %{time: stop_time, duration: stop_time - start_time}

metadata = %{
module: module,
attempt: attempt,
queue: queue,
exchange: exchange,
routing_key: routing_key
}

:telemetry.execute([:gen_rmq, :consumer, :connection, :stop], measurements, metadata)
end

defp emit_connection_error_event(start_time, module, attempt, queue, exchange, routing_key, error) do
stop_time = System.monotonic_time()
measurements = %{time: stop_time, duration: stop_time - start_time}

metadata = %{
module: module,
attempt: attempt,
queue: queue,
exchange: exchange,
routing_key: routing_key,
error: error
}

:telemetry.execute([:gen_rmq, :consumer, :connection, :error], measurements, metadata)
end

defp strip_key(keyword_list, key) do
keyword_list
|> Keyword.delete(key)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule GenRMQ.MessageTask do
defmodule GenRMQ.Consumer.MessageTask do
@moduledoc """
Struct wrapping details of a Task that is executing the configured
`handle_message` callback
Expand Down

0 comments on commit c82bb1e

Please sign in to comment.