Skip to content

Commit

Permalink
Use a telemetry event to log stager changes
Browse files Browse the repository at this point in the history
Aside from an instrumentable event, the new logs are structured for
consistent parsing on external aggregators.
  • Loading branch information
sorentwo committed Apr 17, 2023
1 parent 4330866 commit 8274a6e
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 28 deletions.
11 changes: 2 additions & 9 deletions lib/oban/stager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ defmodule Oban.Stager do

alias Oban.{Engine, Job, Notifier, Peer, Plugin, Repo}

require Logger

@type option :: Plugin.option() | {:interval, pos_integer()}

defmodule State do
Expand Down Expand Up @@ -93,7 +91,7 @@ defmodule Oban.Stager do

def handle_info({:notification, :stager, _payload}, %State{} = state) do
if state.mode == :local do
Logger.info("Oban job staging switched back to global mode.")
:telemetry.execute([:oban, :stager, :switch], %{}, %{conf: state.conf, mode: :global})
end

{:noreply, %{state | ping_at_tick: 60, mode: :global, swap_at_tick: 65, tick: 0}}
Expand Down Expand Up @@ -154,12 +152,7 @@ defmodule Oban.Stager do
end

if state.mode == :global and state.tick == state.swap_at_tick do
Logger.info("""
Oban job staging switched to local mode.
Local mode places more load on the database because each queue polls for jobs every second.
To restore global mode, ensure notifications work or switch to the PG notifier.
""")
:telemetry.execute([:oban, :stager, :switch], %{}, %{conf: state.conf, mode: :local})

%{state | mode: :local}
else
Expand Down
84 changes: 66 additions & 18 deletions lib/oban/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ defmodule Oban.Telemetry do
* `[:oban, :peer, :election, :start | :stop | :exception]`
| event | measures | metadata |
| ------------ | -------------- | ------------------------------------------------------- |
| `:start` | `:system_time` | `:conf`, `:leader`, `:peer`, |
| `:stop` | `:duration` | `:conf`, `:leader`, `:peer`, |
| ------------ | -------------- | -------------------------------------------------------------- |
| `:start` | `:system_time` | `:conf`, `:leader`, `:peer`, |
| `:stop` | `:duration` | `:conf`, `:leader`, `:peer`, |
| `:exception` | `:duration` | `:conf`, `:leader`, `:peer`, `:kind`, `:reason`, `:stacktrace` |
### Metadata
Expand All @@ -169,6 +169,21 @@ defmodule Oban.Telemetry do
* `:leader` — whether the peer is the current leader
* `:peer` — the module used for peering
## Stager Events
Oban emits an event any time the Stager switches between `local` and `global` modes:
* `[:oban, :stager, :switch]`
| event | measures | metadata |
| ------------ | --------- | ---------------- |
| `:switch` | | `:conf`, `:mode` |
### Metadata
* `:conf` — see the explanation in metadata above
* `:mode` — either `local` for polling mode or `global` in the more efficient pub-sub mode
## Default Logger
A default log handler that emits structured JSON is provided, see `attach_default_logger/0` for
Expand Down Expand Up @@ -241,12 +256,12 @@ defmodule Oban.Telemetry do
@doc """
Attaches a default structured JSON Telemetry handler for logging.
This function attaches a handler that outputs logs with the following fields:
This function attaches a handler that outputs logs with the following fields for job events:
* `args` — a map of the job's raw arguments
* `attempt` — the job's execution atttempt
* `duration` — the job's runtime duration, in the native time unit
* `event` — either `job:stop` or `job:exception` depending on reporting telemetry event
* `event` — `job:start`, `job:stop`, `job:exception` depending on reporting telemetry event
* `error` — a formatted error banner, without the extended stacktrace
* `id` — the job's id
* `meta` — a map of the job's raw metadata
Expand All @@ -258,10 +273,16 @@ defmodule Oban.Telemetry do
* `tags` — the job's tags
* `worker` — the job's worker module
## Options
And the following fields for stager events:
* `event` — always `stager:switch`
* `message` — information about the mode switch
* `mode` — either `"local"` or `"global"`
* `source` — always "oban"
* `:level` — The log level to use for logging output, defaults to `:info`.
## Options
* `:level` — The log level to use for logging output, defaults to `:info`
* `:encode` — Whether to encode log output as JSON, defaults to `true`
## Examples
Expand Down Expand Up @@ -290,7 +311,8 @@ defmodule Oban.Telemetry do
events = [
[:oban, :job, :start],
[:oban, :job, :stop],
[:oban, :job, :exception]
[:oban, :job, :exception],
[:oban, :stager, :switch]
]

opts =
Expand Down Expand Up @@ -324,37 +346,63 @@ defmodule Oban.Telemetry do
@doc false
@spec handle_event([atom()], map(), map(), Keyword.t()) :: :ok
def handle_event([:oban, :job, event], measure, meta, opts) do
level = Keyword.fetch!(opts, :level)

Logger.log(level, fn ->
log(opts, fn ->
details = Map.take(meta.job, ~w(attempt args id max_attempts meta queue tags worker)a)

timing =
extra =
case event do
:start ->
%{system_time: measure.system_time}
%{event: "job:start", system_time: measure.system_time}

:stop ->
%{
duration: convert(measure.duration),
event: "job:stop",
queue_time: convert(measure.queue_time),
state: meta.state
}

:exception ->
%{
error: Exception.format_banner(meta.kind, meta.reason, meta.stacktrace),
event: "job:exception",
duration: convert(measure.duration),
queue_time: convert(measure.queue_time),
state: meta.state
}
end

output =
details
|> Map.put(:event, "job:#{event}")
|> Map.put(:source, "oban")
|> Map.merge(timing)
Map.merge(details, extra)
end)
end

def handle_event([:oban, :stager, :switch], _measure, %{mode: mode}, opts) do
log(opts, fn ->
case mode do
:local ->
%{
event: "stager:switch",
mode: "local",
message:
"job staging switched to local mode. local mode polls for jobs for every queue; " <>
"restore global mode with a functional notifier"
}

:global ->
%{
event: "stager:switch",
mode: "global",
message: "job staging switched back to global mode"
}
end
end)
end

defp log(opts, fun) do
level = Keyword.fetch!(opts, :level)

Logger.log(level, fn ->
output = Map.put(fun.(), :source, "oban")

if Keyword.fetch!(opts, :encode) do
Jason.encode_to_iodata!(output)
Expand Down
5 changes: 4 additions & 1 deletion test/oban/stager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ defmodule Oban.StagerTest do
|> Enum.sort()
end

@tag :capture_log
test "switching to local mode without functional pubsub" do
ref = :telemetry_test.attach_event_handlers(self(), [[:oban, :stager, :switch]])

name =
start_supervised_oban!(
stage_interval: 2,
Expand All @@ -45,6 +46,8 @@ defmodule Oban.StagerTest do
Registry.register(Oban.Registry, prod_name, nil)

assert_receive {:notification, :insert, %{"queue" => "staging_test"}}

assert_receive {[:oban, :stager, :switch], ^ref, %{}, %{mode: :local}}
end
end
end
15 changes: 15 additions & 0 deletions test/oban/telemetry_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,21 @@ defmodule Oban.TelemetryTest do
Telemetry.detach_default_logger()
end

test "the default handler logs stager switch events" do
:ok = Telemetry.attach_default_logger(:warn)

logged =
capture_log(fn ->
:telemetry.execute([:oban, :stager, :switch], %{}, %{mode: :local})
end)

assert logged =~ ~s("source":"oban")
assert logged =~ ~s("event":"stager:switch")
assert logged =~ ~s("message":"job staging switched to local mode)
after
Telemetry.detach_default_logger()
end

test "detaching the logger prevents logging" do
:ok = Telemetry.attach_default_logger(:warn)

Expand Down

0 comments on commit 8274a6e

Please sign in to comment.