Skip to content

Commit

Permalink
refactor telemetry integration to be more consistent with the community
Browse files Browse the repository at this point in the history
  • Loading branch information
swelham committed Jul 15, 2019
1 parent b45aff4 commit 7589d76
Show file tree
Hide file tree
Showing 15 changed files with 101 additions and 87 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ The full list of configuration options are available in the [Configuration](conf

By default Faktory Worker will not output any log messages but instead emit events using the [Telemetry](https://github.com/beam-telemetry/telemetry) library.

To enable the built in logging you will need to attach the `FaktoryWorker.EventLogger` to Telemetry. The ideal place to do this is in your `Application.start/2` callback.
To enable the built in logging you will need to attach the default Telemetry handler provided by FaktoryWorker. The ideal place to do this is in your `Application.start/2` callback.

```elixir
defmodule MyApp.Application do
use Application

def start(_, _) do
FaktoryWorker.EventLogger.attach()
FaktoryWorker.attach_default_telemetry_handler()

...
end
Expand Down
13 changes: 13 additions & 0 deletions lib/faktory_worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,17 @@ defmodule FaktoryWorker do
start: {Supervisor, :start_link, [children, [strategy: :one_for_one]]}
}
end

@doc """
Attaches the default telemetry handler provided by FaktoryWorker.
This function attaches the default telemetry handler provided by FaktoryWorker that
outputs log messages for each of the events emitted by FaktoryWorker.
For a full list of events see the [Logging](logging.html) documentation.
"""
@spec attach_default_telemetry_handler :: :ok | {:error, :already_exists}
def attach_default_telemetry_handler() do
FaktoryWorker.Telemetry.attach_default_handler()
end
end
12 changes: 0 additions & 12 deletions lib/faktory_worker/event_dispatcher.ex

This file was deleted.

4 changes: 2 additions & 2 deletions lib/faktory_worker/push_pipeline/acknowledger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule FaktoryWorker.PushPipeline.Acknowledger do
@moduledoc false

alias FaktoryWorker.Job
alias FaktoryWorker.EventDispatcher
alias FaktoryWorker.Telemetry

@behaviour Broadway.Acknowledger

Expand All @@ -12,7 +12,7 @@ defmodule FaktoryWorker.PushPipeline.Acknowledger do
end

defp handle_failed_message(%{status: {:error, :not_unique}, data: {_, job}}) do
EventDispatcher.dispatch_event(:push, {:error, :not_unique}, job)
Telemetry.execute(:push, {:error, :not_unique}, job)
end

defp handle_failed_message(%{data: {pipeline, payload}}) do
Expand Down
4 changes: 2 additions & 2 deletions lib/faktory_worker/push_pipeline/consumer.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule FaktoryWorker.PushPipeline.Consumer do
@moduledoc false

alias FaktoryWorker.EventDispatcher
alias FaktoryWorker.Telemetry
alias FaktoryWorker.{ConnectionManager, Pool}

@behaviour Broadway
Expand All @@ -26,7 +26,7 @@ defmodule FaktoryWorker.PushPipeline.Consumer do
end

defp send_command_result({:ok, _}, job) do
EventDispatcher.dispatch_event(:push, :ok, job)
Telemetry.execute(:push, :ok, job)
end

defp send_command_result({:error, reason}, _), do: {:error, reason}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
defmodule FaktoryWorker.EventLogger do
@moduledoc """
Handles all of the Telemetry events emitted by Faktory Worker and outputs a log message
when receiving an event.
"""
defmodule FaktoryWorker.Telemetry do
@moduledoc false

require Logger

@events [:push, :beat, :fetch, :ack, :failed_ack]

@doc """
Attaches the `FaktoryWorker.EventLogger` to Telemetry.
Once attached Faktory Worker will start outputting log messages using the `Logger` module for
each event emitted.
For a full list of events see the [Logging](logging.html) documentation.
"""
@spec attach :: :ok | {:error, :already_exists}
def attach() do
@doc false
@spec attach_default_handler :: :ok | {:error, :already_exists}
def attach_default_handler() do
events = Enum.map(@events, &[:faktory_worker, &1])

:telemetry.attach_many(:faktory_worker_logger, events, &__MODULE__.handle_event/4, [])
end

@doc false
@spec execute(event :: atom(), outcome :: term(), metadata :: map()) :: :ok
def execute(event, outcome, metadata) do
:telemetry.execute(
[:faktory_worker, event],
%{status: outcome},
metadata
)
end

@doc false
def handle_event([:faktory_worker, event], measurements, metadata, _config) do
log_event(event, measurements, metadata)
Expand Down
8 changes: 4 additions & 4 deletions lib/faktory_worker/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule FaktoryWorker.Worker do
require Logger

alias FaktoryWorker.ConnectionManager
alias FaktoryWorker.EventDispatcher
alias FaktoryWorker.Telemetry
alias FaktoryWorker.ErrorFormatter
alias FaktoryWorker.QueueManager

Expand Down Expand Up @@ -156,7 +156,7 @@ defmodule FaktoryWorker.Worker do
def handle_fetch_response({:ok, _}, state), do: schedule_fetch(state)

def handle_fetch_response({:error, reason}, state) do
EventDispatcher.dispatch_event(:fetch, {:error, reason}, %{wid: state.process_wid})
Telemetry.execute(:fetch, {:error, reason}, %{wid: state.process_wid})
Process.send_after(self(), :fetch, state.retry_interval)
state
end
Expand All @@ -176,7 +176,7 @@ defmodule FaktoryWorker.Worker do
end

defp handle_ack_response({:ok, _}, ack_type, state) do
EventDispatcher.dispatch_event(:ack, ack_type, %{
Telemetry.execute(:ack, ack_type, %{
jid: state.job_id,
args: state.job["args"],
jobtype: state.job["jobtype"]
Expand All @@ -196,7 +196,7 @@ defmodule FaktoryWorker.Worker do
end

defp handle_ack_response({:error, _}, ack_type, state) do
EventDispatcher.dispatch_event(:failed_ack, ack_type, %{
Telemetry.execute(:failed_ack, ack_type, %{
jid: state.job_id,
args: state.job["args"],
jobtype: state.job["jobtype"]
Expand Down
6 changes: 3 additions & 3 deletions lib/faktory_worker/worker/hearbeat_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule FaktoryWorker.Worker.HeartbeatServer do

use GenServer

alias FaktoryWorker.EventDispatcher
alias FaktoryWorker.Telemetry
alias FaktoryWorker.ConnectionManager
alias FaktoryWorker.Worker.Server
alias FaktoryWorker.Worker.Pool
Expand Down Expand Up @@ -87,7 +87,7 @@ defmodule FaktoryWorker.Worker.HeartbeatServer do
def terminate(_, _), do: :ok

defp handle_beat_response({{:ok, %{"state" => new_beat_state}}, conn}, state) do
EventDispatcher.dispatch_event(:beat, :ok, %{
Telemetry.execute(:beat, :ok, %{
prev_status: state.beat_state,
wid: state.process_wid
})
Expand All @@ -102,7 +102,7 @@ defmodule FaktoryWorker.Worker.HeartbeatServer do
end

defp handle_beat_response({{result, _}, conn}, state) when result in [:ok, :error] do
EventDispatcher.dispatch_event(:beat, result, %{
Telemetry.execute(:beat, result, %{
prev_status: state.beat_state,
wid: state.process_wid
})
Expand Down
2 changes: 1 addition & 1 deletion logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

Faktory Worker uses the [Telemetry](https://github.com/beam-telemetry/telemetry) library to emit events that can be handled by either Faktory Worker or the consuming application. This provides the flexibility for the consumer of Faktory Worker to choose how logging should behave.

By default Faktory Worker will not output any log messages. Logging needs to be explicitly enabled by calling the `FaktoryWorker.EventLogger.attach/0` function.
By default Faktory Worker will not output any log messages. Logging needs to be explicitly enabled by calling the `FaktoryWorker.attach_default_telemetry_handler/0` function.

Events can also be handled in the consuming application by manually attaching to Telemetry specifying the list of Faktory Worker events to handle. For instructions on creating a custom event handler see the Telemetry [README](https://github.com/beam-telemetry/telemetry/blob/master/README.md).

Expand Down
22 changes: 0 additions & 22 deletions test/faktory_worker/event_dispatcher_test.exs

This file was deleted.

2 changes: 1 addition & 1 deletion test/faktory_worker/push_pipeline/acknowledger_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule FaktoryWorker.PushPipeline.AcknowledgerTest do
alias FaktoryWorker.PushPipeline.Acknowledger

describe "ack/3" do
test "should dispatch the job not unique event" do
test "should execute the job not unique event" do
event_handler_id = attach_event_handler([:push])

jid = Random.job_id()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
defmodule FaktoryWorker.EventLoggerTest do
defmodule FaktoryWorker.TelemetryTest do
use ExUnit.Case

import ExUnit.CaptureLog
import FaktoryWorker.EventHandlerTestHelpers

alias FaktoryWorker.Random
alias FaktoryWorker.EventLogger
alias FaktoryWorker.Telemetry

describe "attach/3" do
test "should attach the event logger to telemetry" do
EventLogger.attach()
describe "attach_default_handler/0" do
test "should attach the default FaktoryWorker telemetry handler" do
Telemetry.attach_default_handler()

event_handlers = :telemetry.list_handlers([:faktory_worker])

Expand All @@ -28,7 +28,7 @@ defmodule FaktoryWorker.EventLoggerTest do
handled_events = Enum.map(event_handlers, & &1.event_name)

assert handler_name == :faktory_worker_logger
assert handler_function == (&FaktoryWorker.EventLogger.handle_event/4)
assert handler_function == (&FaktoryWorker.Telemetry.handle_event/4)
assert Enum.member?(handled_events, [:faktory_worker, :push])
assert Enum.member?(handled_events, [:faktory_worker, :beat])
assert Enum.member?(handled_events, [:faktory_worker, :fetch])
Expand All @@ -39,6 +39,21 @@ defmodule FaktoryWorker.EventLoggerTest do
end
end

describe "execute/3" do
test "should execute an event to telemetry" do
event_handler_id = attach_event_handler([:test_event])

Telemetry.execute(:test_event, "test outcome", %{metadata: "test"})

assert_receive {event, outcome, metadata}
assert event == [:faktory_worker, :test_event]
assert outcome == %{status: "test outcome"}
assert metadata == %{metadata: "test"}

detach_event_handler(event_handler_id)
end
end

describe "handle_event/4" do
test "should log a successful push event" do
outcome = %{status: :ok}
Expand All @@ -51,7 +66,7 @@ defmodule FaktoryWorker.EventLoggerTest do

log_message =
capture_log(fn ->
EventLogger.handle_event([:faktory_worker, :push], outcome, metadata, [])
Telemetry.handle_event([:faktory_worker, :push], outcome, metadata, [])
end)

assert log_message =~
Expand All @@ -71,7 +86,7 @@ defmodule FaktoryWorker.EventLoggerTest do

log_message =
capture_log(fn ->
EventLogger.handle_event([:faktory_worker, :push], outcome, metadata, [])
Telemetry.handle_event([:faktory_worker, :push], outcome, metadata, [])
end)

assert log_message =~
Expand All @@ -90,7 +105,7 @@ defmodule FaktoryWorker.EventLoggerTest do

log_message =
capture_log(fn ->
EventLogger.handle_event([:faktory_worker, :beat], outcome, metadata, [])
Telemetry.handle_event([:faktory_worker, :beat], outcome, metadata, [])
end)

assert log_message =~ "[faktory-worker] Heartbeat Succeeded wid-#{metadata.wid}"
Expand All @@ -106,7 +121,7 @@ defmodule FaktoryWorker.EventLoggerTest do

log_message =
capture_log(fn ->
EventLogger.handle_event([:faktory_worker, :beat], outcome, metadata, [])
Telemetry.handle_event([:faktory_worker, :beat], outcome, metadata, [])
end)

assert log_message =~ "[faktory-worker] Heartbeat Failed wid-#{metadata.wid}"
Expand All @@ -122,7 +137,7 @@ defmodule FaktoryWorker.EventLoggerTest do

log_message =
capture_log(fn ->
EventLogger.handle_event([:faktory_worker, :beat], outcome, metadata, [])
Telemetry.handle_event([:faktory_worker, :beat], outcome, metadata, [])
end)

assert log_message == ""
Expand All @@ -137,7 +152,7 @@ defmodule FaktoryWorker.EventLoggerTest do

log_message =
capture_log(fn ->
EventLogger.handle_event([:faktory_worker, :fetch], outcome, metadata, [])
Telemetry.handle_event([:faktory_worker, :fetch], outcome, metadata, [])
end)

assert log_message =~
Expand All @@ -157,7 +172,7 @@ defmodule FaktoryWorker.EventLoggerTest do

log_message =
capture_log(fn ->
EventLogger.handle_event([:faktory_worker, :ack], outcome, metadata, [])
Telemetry.handle_event([:faktory_worker, :ack], outcome, metadata, [])
end)

assert log_message =~
Expand All @@ -177,7 +192,7 @@ defmodule FaktoryWorker.EventLoggerTest do

log_message =
capture_log(fn ->
EventLogger.handle_event([:faktory_worker, :ack], outcome, metadata, [])
Telemetry.handle_event([:faktory_worker, :ack], outcome, metadata, [])
end)

assert log_message =~
Expand All @@ -197,7 +212,7 @@ defmodule FaktoryWorker.EventLoggerTest do

log_message =
capture_log(fn ->
EventLogger.handle_event([:faktory_worker, :failed_ack], outcome, metadata, [])
Telemetry.handle_event([:faktory_worker, :failed_ack], outcome, metadata, [])
end)

assert log_message =~
Expand All @@ -217,7 +232,7 @@ defmodule FaktoryWorker.EventLoggerTest do

log_message =
capture_log(fn ->
EventLogger.handle_event([:faktory_worker, :failed_ack], outcome, metadata, [])
Telemetry.handle_event([:faktory_worker, :failed_ack], outcome, metadata, [])
end)

assert log_message =~
Expand Down
2 changes: 1 addition & 1 deletion test/faktory_worker/worker/heartbeat_server_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ defmodule FaktoryWorker.Worker.HeartbeatServerTest do
assert state.beat_state == :terminate
end

test "should dispatch an event when the beat outcome changes" do
test "should execute an event when the beat outcome changes" do
event_handler_id = attach_event_handler([:beat])

process_wid = Random.process_wid()
Expand Down
Loading

0 comments on commit 7589d76

Please sign in to comment.