Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow defining a job priority [implements #56] #57

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 19 additions & 8 deletions lib/rihanna/job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ defmodule Rihanna.Job do
:enqueued_at,
:due_at,
:failed_at,
:fail_reason
:fail_reason,
:priority
]

defstruct @fields
Expand All @@ -87,6 +88,14 @@ defmodule Rihanna.Job do
GenServer.call(Rihanna.JobManager, job)
end

@doc """
The priority of this job.

Conforms to the niceness values used in Linux processes — lower values
are more important.
"""
def priority, do: 19

@doc false
def enqueue(term, due_at \\ nil) do
serialized_term = :erlang.term_to_binary(term)
Expand All @@ -96,11 +105,11 @@ defmodule Rihanna.Job do
result =
producer_query(
"""
INSERT INTO "#{table()}" (term, enqueued_at, due_at)
VALUES ($1, $2, $3)
INSERT INTO "#{table()}" (term, enqueued_at, due_at, priority)
VALUES ($1, $2, $3, $4)
RETURNING #{@sql_fields}
""",
[serialized_term, now, due_at]
[serialized_term, now, due_at, priority()]
)

case result do
Expand Down Expand Up @@ -134,15 +143,17 @@ defmodule Rihanna.Job do
enqueued_at,
due_at,
failed_at,
fail_reason
fail_reason,
priority
]) do
%__MODULE__{
id: id,
term: :erlang.binary_to_term(serialized_term),
enqueued_at: enqueued_at,
due_at: due_at,
failed_at: failed_at,
fail_reason: fail_reason
fail_reason: fail_reason,
priority: priority
}
end

Expand Down Expand Up @@ -252,7 +263,7 @@ defmodule Rihanna.Job do
WHERE NOT (id = ANY($3))
AND (due_at IS NULL OR due_at <= now())
AND failed_at IS NULL
ORDER BY enqueued_at, j.id
ORDER BY priority, enqueued_at, j.id
FOR UPDATE OF j SKIP LOCKED
LIMIT 1
) AS t1
Expand All @@ -266,7 +277,7 @@ defmodule Rihanna.Job do
AND (due_at IS NULL OR due_at <= now())
AND failed_at IS NULL
AND (j.enqueued_at, j.id) > (jobs.enqueued_at, jobs.id)
ORDER BY enqueued_at, j.id
ORDER BY priority, enqueued_at, j.id
FOR UPDATE OF j SKIP LOCKED
LIMIT 1
) AS j
Expand Down
3 changes: 2 additions & 1 deletion lib/rihanna/migration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ defmodule Rihanna.Migration do
CREATE TABLE #{table_name} (
id int NOT NULL,
term bytea NOT NULL,
priority integer NOT NULL DEFAULT 19,
enqueued_at timestamp with time zone NOT NULL,
due_at timestamp with time zone,
failed_at timestamp with time zone,
Expand Down Expand Up @@ -135,7 +136,7 @@ defmodule Rihanna.Migration do
ADD CONSTRAINT #{table_name}_pkey PRIMARY KEY (id);
""",
"""
CREATE INDEX #{table_name}_enqueued_at_id ON #{table_name} (enqueued_at ASC, id ASC);
CREATE INDEX #{table_name}_enqueued_at_id ON #{table_name} (priority ASC, enqueued_at ASC, id ASC);
"""
]
end
Expand Down
22 changes: 22 additions & 0 deletions lib/rihanna/migration/upgrade.ex
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ defmodule Rihanna.Migration.Upgrade do
""",
"""
ALTER TABLE #{table_name} DROP COLUMN rihanna_internal_meta;
""",
"""
ALTER TABLE #{table_name} DROP COLUMN priority;
""",
"""
DO $$
BEGIN
DROP INDEX IF EXISTS rihanna_jobs_enqueued_at_id;
CREATE INDEX IF NOT EXISTS rihanna_jobs_enqueued_at_id ON rihanna_jobs (enqueued_at ASC, id ASC);
END;
$$
"""
]
end
Expand Down Expand Up @@ -116,6 +127,17 @@ defmodule Rihanna.Migration.Upgrade do
""",
"""
CREATE INDEX IF NOT EXISTS rihanna_jobs_enqueued_at_id ON rihanna_jobs (enqueued_at ASC, id ASC);
""",
"""
ALTER TABLE #{table_name} ADD COLUMN priority integer NOT NULL DEFAULT 19;
""",
"""
DO $$
BEGIN
DROP INDEX IF EXISTS rihanna_jobs_enqueued_at_id;
CREATE INDEX IF NOT EXISTS rihanna_jobs_enqueued_at_id ON rihanna_jobs (priority ASC, enqueued_at ASC, id ASC);
END;
$$
"""
]
end
Expand Down
8 changes: 7 additions & 1 deletion test/rihanna/job_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ defmodule Rihanna.JobTest do
[job] ++
[
insert_job(pg, :ready_to_run),
insert_job(pg, :ready_to_run)
insert_job(pg, :ready_to_run_highest_priority)
]

{:ok, %{jobs: jobs}}
Expand All @@ -102,6 +102,12 @@ defmodule Rihanna.JobTest do
assert length(locked) == 3
end

test "locks all available jobs, ordered with the highest priority first", %{pg: pg} do
[ first_job | _rest ] = lock(pg, 4)

assert %Rihanna.Job{priority: -19} = first_job
end

test "locks all available jobs if equal to N", %{pg: pg, jobs: jobs} do
locked = lock(pg, 3)

Expand Down
7 changes: 7 additions & 0 deletions test/rihanna_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ defmodule RihannaTest do
assert job.due_at |> is_nil
assert job.fail_reason |> is_nil
assert job.failed_at |> is_nil
assert job.priority == 19
assert job.term == @term
end

Expand All @@ -33,6 +34,7 @@ defmodule RihannaTest do
assert job.due_at |> is_nil
assert job.fail_reason |> is_nil
assert job.failed_at |> is_nil
assert job.priority == 19
assert job.term == @term
end

Expand Down Expand Up @@ -67,6 +69,7 @@ defmodule RihannaTest do
assert job.due_at |> is_nil
assert job.fail_reason |> is_nil
assert job.failed_at |> is_nil
assert job.priority == 19
assert job.term == {Rihanna.Mocks.MockJob, :arg}
end

Expand All @@ -80,6 +83,7 @@ defmodule RihannaTest do
assert job.due_at |> is_nil
assert job.fail_reason |> is_nil
assert job.failed_at |> is_nil
assert job.priority == 19
assert job.term == {Rihanna.Mocks.MockJob, :arg}
end

Expand Down Expand Up @@ -122,6 +126,7 @@ defmodule RihannaTest do
assert job.due_at |> is_nil
assert job.fail_reason |> is_nil
assert job.failed_at |> is_nil
assert job.priority == 19
assert job.term == {Rihanna.Mocks.MockJob, :arg}

assert {:ok, _} = Rihanna.delete(job.id)
Expand All @@ -144,6 +149,7 @@ defmodule RihannaTest do
assert job.due_at |> is_nil
assert job.fail_reason |> is_nil
assert job.failed_at |> is_nil
assert job.priority == 19
assert job.term == {Rihanna.Mocks.MockJob, :arg}

assert {:ok, _} = Rihanna.delete(job.id)
Expand All @@ -166,6 +172,7 @@ defmodule RihannaTest do
fail_reason: nil,
failed_at: nil,
id: 1,
priority: 19,
term: {Rihanna.Mocks.MockJob, :arg}
}} = Rihanna.delete(job.id)

Expand Down
27 changes: 22 additions & 5 deletions test/support/test_helper.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ defmodule TestHelper do
exec.(
conn,
"""
SELECT id, term, enqueued_at, due_at, failed_at, fail_reason FROM "rihanna_jobs" WHERE id = $1
SELECT id, term, enqueued_at, due_at, failed_at, fail_reason, priority FROM "rihanna_jobs" WHERE id = $1
""",
[id]
)
Expand All @@ -81,7 +81,24 @@ defmodule TestHelper do
"""
INSERT INTO "rihanna_jobs" (term, enqueued_at)
VALUES ($1, '2018-01-01')
RETURNING id, term, enqueued_at, due_at, failed_at, fail_reason
RETURNING id, term, enqueued_at, due_at, failed_at, fail_reason, priority
""",
[:erlang.term_to_binary(@test_term)]
)

[job] = Rihanna.Job.from_sql(result.rows)

job
end

def insert_job(pg, :ready_to_run_highest_priority) do
result =
Postgrex.query!(
pg,
"""
INSERT INTO "rihanna_jobs" (term, enqueued_at, priority)
VALUES ($1, '2018-01-01', -19)
RETURNING id, term, enqueued_at, due_at, failed_at, fail_reason, priority
""",
[:erlang.term_to_binary(@test_term)]
)
Expand All @@ -99,7 +116,7 @@ defmodule TestHelper do
"""
INSERT INTO "rihanna_jobs" (term, enqueued_at, due_at)
VALUES ($1, '2018-01-01', now() + interval '1 minute')
RETURNING id, term, enqueued_at, due_at, failed_at, fail_reason
RETURNING id, term, enqueued_at, due_at, failed_at, fail_reason, priority
""",
[:erlang.term_to_binary(@test_term)]
)
Expand All @@ -117,7 +134,7 @@ defmodule TestHelper do
"""
INSERT INTO "rihanna_jobs" (term, enqueued_at, due_at)
VALUES ($1, '2018-01-01', now())
RETURNING id, term, enqueued_at, due_at, failed_at, fail_reason
RETURNING id, term, enqueued_at, due_at, failed_at, fail_reason, priority
""",
[:erlang.term_to_binary(@test_term)]
)
Expand All @@ -139,7 +156,7 @@ defmodule TestHelper do
fail_reason
)
VALUES ($1, '2018-01-01', '2018-01-02', 'Kaboom!')
RETURNING id, term, enqueued_at, due_at, failed_at, fail_reason
RETURNING id, term, enqueued_at, due_at, failed_at, fail_reason, priority
""",
[:erlang.term_to_binary(@test_term)]
)
Expand Down