Skip to content

Commit

Permalink
Wrap worker crashes and tuples in exceptions
Browse files Browse the repository at this point in the history
This makes several significant changes to how crahes, errors and
timeouts are reported from `perform/1` calls:

* Timeouts are wrapped in `Oban.TimeoutError`
* Error and discard tuples are wrapped in `Oban.PerformError`
* Exits and throws are wrapped in `Oban.CrashError`
* Stacktraces are only included from code that is rescued or caught, not
  from error tuples or timeouts.

The goal is improve error formatting within a job's error array and to
make error reporting to external services like Sentry entirely
consistent.

Fixes #305
  • Loading branch information
sorentwo committed Aug 7, 2020
1 parent 601fd5a commit a832b6b
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 20 deletions.
19 changes: 19 additions & 0 deletions CHANGELOG.md
Expand Up @@ -32,6 +32,25 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
default of "None" is used. All discarded jobs will have an error now, whether
discarded manually or automatically.

### Changed

- [Oban.Worker] Wrap `{:error, reason}` and `{:discard, reason}` in a proper
`Oban.PerformError` exception with a customized message. This ensures that the
`:error` value passed to telemetry handlers is an exception and not a raw
term.

- [Oban.Worker] Wrap job timeouts in `Oban.TimeoutError` with a customized
message indicating the worker and timeout value. This replaces the raw
`:timeout` atom that was reported before.

- [Oban.Worker] Wrap caught exits and throws in `Oban.CrashError` with a
formatted message. This means the `:error` value passed to telemetry is
_always_ a proper exception and easier to report.

- [Oban.Worker] Stop reporting internal stacktraces for timeouts, discards or
error tuples. The stacktrace was useless and potentially misleading as it
appeared that the error originated from Oban rather than the worker module.

## [2.0.0] — 2020-07-10

No changes from [2.0.0-rc.3][].
Expand Down
12 changes: 12 additions & 0 deletions lib/oban/crash_error.ex
@@ -0,0 +1,12 @@
defmodule Oban.CrashError do
@moduledoc """
Wraps unhandled exits and throws that occur during job execution.
"""

defexception [:message]

@impl Exception
def exception({kind, reason, stacktrace}) do
%__MODULE__{message: Exception.format_banner(kind, reason, stacktrace)}
end
end
14 changes: 14 additions & 0 deletions lib/oban/perform_error.ex
@@ -0,0 +1,14 @@
defmodule Oban.PerformError do
@moduledoc """
Wraps the reason returned by `{:error, reason}`, `{:discard, reason}` in a proper exception.
"""

alias Oban.Worker

defexception [:message]

@impl Exception
def exception({worker, reason}) do
%__MODULE__{message: "#{Worker.to_string(worker)} failed with #{inspect(reason)}"}
end
end
29 changes: 15 additions & 14 deletions lib/oban/queue/executor.ex
Expand Up @@ -3,7 +3,7 @@ defmodule Oban.Queue.Executor do

require Logger

alias Oban.{Breaker, Config, Job, Query, Worker}
alias Oban.{Breaker, Config, CrashError, Job, PerformError, Query, TimeoutError, Worker}

@type success :: {:success, Job.t()}
@type failure :: {:failure, Job.t(), Worker.t(), atom(), term()}
Expand All @@ -30,7 +30,6 @@ defmodule Oban.Queue.Executor do
:conf,
:error,
:job,
:kind,
:meta,
:snooze,
:start_mono,
Expand All @@ -39,6 +38,7 @@ defmodule Oban.Queue.Executor do
:worker,
safe: true,
duration: 0,
kind: :error,
queue_time: 0,
stacktrace: [],
state: :unset
Expand Down Expand Up @@ -147,9 +147,14 @@ defmodule Oban.Queue.Executor do

defp perform_inline(%{safe: true} = exec) do
perform_inline(%{exec | safe: false})
rescue
error ->
%{exec | state: :failure, error: error, stacktrace: __STACKTRACE__}
catch
kind, value ->
%{exec | state: :failure, kind: kind, error: value, stacktrace: __STACKTRACE__}
error = CrashError.exception({kind, value, __STACKTRACE__})

%{exec | state: :failure, error: error, stacktrace: __STACKTRACE__}
end

defp perform_inline(%{worker: worker, job: job} = exec) do
Expand All @@ -161,13 +166,13 @@ defmodule Oban.Queue.Executor do
%{exec | state: :success}

:discard ->
%{exec | state: :discard, kind: :error, error: "None", stacktrace: current_stacktrace()}
%{exec | state: :discard, error: PerformError.exception({worker, :discard})}

{:discard, reason} ->
%{exec | state: :discard, kind: :error, error: reason, stacktrace: current_stacktrace()}
%{exec | state: :discard, error: PerformError.exception({worker, reason})}

{:error, error} ->
%{exec | state: :failure, kind: :error, error: error, stacktrace: current_stacktrace()}
{:error, reason} ->
%{exec | state: :failure, error: PerformError.exception({worker, reason})}

{:snooze, seconds} ->
%{exec | state: :snoozed, snooze: seconds}
Expand Down Expand Up @@ -204,7 +209,9 @@ defmodule Oban.Queue.Executor do
reply

nil ->
%{exec | state: :failure, kind: :error, error: :timeout, stacktrace: current_stacktrace()}
error = TimeoutError.exception({exec.worker, timeout})

%{exec | state: :failure, error: error}
end
end

Expand All @@ -226,12 +233,6 @@ defmodule Oban.Queue.Executor do
:telemetry.execute([:oban, :job, :exception], measurements, meta)
end

defp current_stacktrace do
self()
|> Process.info(:current_stacktrace)
|> elem(1)
end

defp event_metadata(conf, job) do
job
|> Map.take([:id, :args, :queue, :worker, :attempt, :max_attempts])
Expand Down
14 changes: 14 additions & 0 deletions lib/oban/timeout_error.ex
@@ -0,0 +1,14 @@
defmodule Oban.TimeoutError do
@moduledoc """
Returned when a job is terminated early due to a custom timeout.
"""

alias Oban.Worker

defexception [:message]

@impl Exception
def exception({worker, timeout}) do
%__MODULE__{message: "#{Worker.to_string(worker)} timed out after #{timeout}ms"}
end
end
6 changes: 3 additions & 3 deletions test/integration/telemetry_test.exs
Expand Up @@ -3,7 +3,7 @@ defmodule Oban.Integration.TelemetryTest do

import ExUnit.CaptureLog

alias Oban.Telemetry
alias Oban.{PerformError, Telemetry}

@moduletag :integration

Expand Down Expand Up @@ -59,8 +59,8 @@ defmodule Oban.Integration.TelemetryTest do
attempt: 1,
max_attempts: 20,
kind: :error,
error: "ERROR",
stacktrace: [_ | _]
error: %PerformError{},
stacktrace: []
} = error_meta

:ok = stop_supervised(Oban)
Expand Down
2 changes: 1 addition & 1 deletion test/integration/timeouts_test.exs
Expand Up @@ -12,6 +12,6 @@ defmodule Oban.Integration.TimeoutsTest do
refute_receive {:ok, 1}

assert %Job{state: "retryable", errors: [%{"error" => error}]} = Repo.reload(job)
assert error =~ "Erlang error: :timeout"
assert error == "** (Oban.TimeoutError) Oban.Integration.Worker timed out after 20ms"
end
end
5 changes: 3 additions & 2 deletions test/oban/queue/executor_test.exs
Expand Up @@ -3,6 +3,7 @@ defmodule Oban.Queue.ExecutorTest do

import ExUnit.CaptureLog

alias Oban.{CrashError, PerformError}
alias Oban.Queue.Executor

defmodule Worker do
Expand All @@ -26,8 +27,8 @@ defmodule Oban.Queue.ExecutorTest do

test "raising, catching and error tuples are failures" do
assert %{state: :failure} = call_with_mode("raise")
assert %{state: :failure, error: :no_reason} = call_with_mode("catch")
assert %{state: :failure, error: "no reason"} = call_with_mode("error")
assert %{state: :failure, error: %CrashError{}} = call_with_mode("catch")
assert %{state: :failure, error: %PerformError{}} = call_with_mode("error")
end

test "inability to resolve a worker is a failure" do
Expand Down

0 comments on commit a832b6b

Please sign in to comment.