Skip to content

Commit

Permalink
Validate unique options at job and worker level
Browse files Browse the repository at this point in the history
  • Loading branch information
sorentwo committed Jul 31, 2019
1 parent c2e1074 commit 625f6aa
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 15 deletions.
41 changes: 32 additions & 9 deletions lib/oban/job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,6 @@ defmodule Oban.Job do

@required ~w(worker args)a

@default_unique_fields ~w(args queue worker)a
@default_unique_period 60
@default_unique_states ~w(available scheduled)a

@doc """
Construct a new job changeset ready for insertion into the database.
Expand Down Expand Up @@ -161,6 +157,17 @@ defmodule Oban.Job do
end
end

@unique_fields ~w(args queue worker)a
@unique_period 60
@unique_states ~w(available scheduled executing retryable completed)a

@doc false
@spec valid_unique_opt?({:fields | :period | :states, [atom()] | integer()}) :: boolean()
def valid_unique_opt?({:fields, [_ | _] = fields}), do: fields -- @unique_fields == []
def valid_unique_opt?({:period, period}), do: is_integer(period) and period > 0
def valid_unique_opt?({:states, [_ | _] = states}), do: states -- @unique_states == []
def valid_unique_opt?(_option), do: false

defp put_scheduling(changeset, value) do
case value do
in_seconds when is_integer(in_seconds) ->
Expand All @@ -180,15 +187,21 @@ defmodule Oban.Job do

defp put_uniqueness(changeset, value) do
case value do
[{key, _val} | _] = opts when key in [:fields, :period, :states] ->
[_ | _] = opts ->
unique =
opts
|> Keyword.put_new(:fields, @default_unique_fields)
|> Keyword.put_new(:period, @default_unique_period)
|> Keyword.put_new(:states, @default_unique_states)
|> Keyword.put_new(:fields, @unique_fields)
|> Keyword.put_new(:period, @unique_period)
|> Keyword.put_new(:states, @unique_states)
|> Map.new()

put_change(changeset, :unique, unique)
case validate_unique_opts(unique) do
:ok ->
put_change(changeset, :unique, unique)

{:error, field, value} ->
add_error(changeset, :unique, "invalid unique option for #{field}, #{inspect(value)}")
end

nil ->
changeset
Expand All @@ -198,6 +211,16 @@ defmodule Oban.Job do
end
end

defp validate_unique_opts(unique) do
Enum.reduce_while(unique, :ok, fn {key, val}, _acc ->
if valid_unique_opt?({key, val}) do
{:cont, :ok}
else
{:halt, {:error, key, val}}
end
end)
end

defp to_clean_string(value) do
value
|> to_string()
Expand Down
28 changes: 25 additions & 3 deletions lib/oban/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,15 @@ defmodule Oban.Worker do

@doc false
defmacro __using__(opts) do
Enum.each(opts, &validate_opt!/1)

quote location: :keep do
alias Oban.{Job, Worker}

@before_compile Worker
@behaviour Worker

@opts unquote(opts)
|> Keyword.take([:queue, :max_attempts, :unique])
|> Keyword.put(:worker, to_string(__MODULE__))
@opts Keyword.put(unquote(opts), :worker, to_string(__MODULE__))

@impl Worker
def new(args, opts \\ []) when is_map(args) and is_list(opts) do
Expand All @@ -177,4 +177,26 @@ defmodule Oban.Worker do
def default_backoff(attempt, base_backoff \\ 15) when is_integer(attempt) do
trunc(:math.pow(2, attempt) + base_backoff)
end

defp validate_opt!({:max_attempts, max_attempts}) do
unless is_integer(max_attempts) and max_attempts > 1 do
raise ArgumentError, "expected :max_attempts to be an integer greater than 1"
end
end

defp validate_opt!({:queue, queue}) do
unless is_atom(queue) or is_binary(queue) do
raise ArgumentError, "expected :queue to be an atom or a binary"
end
end

defp validate_opt!({:unique, unique}) do
unless is_list(unique) and Enum.all?(unique, &Job.valid_unique_opt?/1) do
raise ArgumentError, "unexpected unique options: #{inspect(unique)}"
end
end

defp validate_opt!(option) do
raise ArgumentError, "unknown option provided #{inspect(option)}"
end
end
9 changes: 7 additions & 2 deletions test/oban/job_test.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Oban.JobTest do
use Oban.Case, async: true

describe "new/2" do
describe "scheduling with new/2" do
test "scheduling a job for the future using :schedule_in" do
changeset = Job.new(%{}, worker: Fake, schedule_in: 10)

Expand All @@ -14,14 +14,16 @@ defmodule Oban.JobTest do
assert Job.new(%{}, worker: Fake, schedule_in: "10").errors[:schedule_in]
assert Job.new(%{}, worker: Fake, schedule_in: 0.12).errors[:schedule_in]
end
end

describe "unique constraints with new/2" do
test "marking a job as unique by setting the period provides defaults" do
changeset = Job.new(%{}, worker: Fake, unique: [period: 60])

assert changeset.changes[:unique] == %{
fields: [:args, :queue, :worker],
period: 60,
states: [:available, :scheduled]
states: [:available, :scheduled, :executing, :retryable, :completed]
}
end

Expand All @@ -39,6 +41,9 @@ defmodule Oban.JobTest do
assert Job.new(%{}, worker: Fake, unique: true).errors[:unique]
assert Job.new(%{}, worker: Fake, unique: []).errors[:unique]
assert Job.new(%{}, worker: Fake, unique: [special: :value]).errors[:unique]
assert Job.new(%{}, worker: Fake, unique: [fields: [:bogus]]).errors[:unique]
assert Job.new(%{}, worker: Fake, unique: [period: :infinity]).errors[:unique]
assert Job.new(%{}, worker: Fake, unique: [states: [:random]]).errors[:unique]
end
end
end
39 changes: 38 additions & 1 deletion test/oban/worker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ defmodule Oban.WorkerTest do
end

defmodule CustomWorker do
use Worker, queue: "special", max_attempts: 5, unique: [period: 60]
use Worker,
queue: "special",
max_attempts: 5,
unique: [fields: [:queue, :worker], period: 60, states: [:scheduled]]

@impl Worker
def backoff(attempt), do: attempt * attempt
Expand Down Expand Up @@ -63,4 +66,38 @@ defmodule Oban.WorkerTest do
assert 4 == CustomWorker.perform(%Job{attempt: 4, args: %{a: 2, b: 3}})
end
end

describe "validating __using__ macro options" do
assert_raise ArgumentError, fn ->
defmodule BrokenModule, do: use(Oban.Worker, state: "youcantsetthis")
end

assert_raise ArgumentError, fn ->
defmodule BrokenModule, do: use(Oban.Worker, queue: 1234)
end

assert_raise ArgumentError, fn ->
defmodule BrokenModule, do: use(Oban.Worker, max_attempts: 0)
end

assert_raise ArgumentError, fn ->
defmodule BrokenModule, do: use(Oban.Worker, unique: 0)
end

assert_raise ArgumentError, fn ->
defmodule BrokenModule, do: use(Oban.Worker, unique: [unknown: []])
end

assert_raise ArgumentError, fn ->
defmodule BrokenModule, do: use(Oban.Worker, unique: [fields: [:unknown]])
end

assert_raise ArgumentError, fn ->
defmodule BrokenModule, do: use(Oban.Worker, unique: [period: 0])
end

assert_raise ArgumentError, fn ->
defmodule BrokenModule, do: use(Oban.Worker, unique: [states: [:unknown]])
end
end
end

0 comments on commit 625f6aa

Please sign in to comment.