Skip to content

Commit

Permalink
Success mode support (for monitoring, etc)
Browse files Browse the repository at this point in the history
  • Loading branch information
koudelka committed Jul 13, 2017
1 parent e574b4d commit a8d6ad4
Show file tree
Hide file tree
Showing 17 changed files with 117 additions and 42 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ In your mix.exs file:
```elixir
defp deps do
[{:honeydew, "~> 1.0.0-rc7"}]

end
```

Expand Down Expand Up @@ -210,14 +209,19 @@ To cancel a job that hasn't yet run, use `Honeydew.cancel/2`. If the job was suc
There are various options you can pass to `queue_spec/2` and `worker_spec/3`, see the [Honeydew](https://github.com/koudelka/honeydew/blob/master/lib/honeydew.ex) module.

### Failure Modes
When a worker crashes, a monitoring process runs the `handle_failure/4` function from the selected module on the queue's node. Honeydew ships with two failure modes, at present:
When a worker crashes, a monitoring process runs the `handle_failure/3` function from the selected module on the queue's node. Honeydew ships with two failure modes, at present:

- `Honeydew.FailureMode.Abandon`: Simply forgets about the job.
- `Honeydew.FailureMode.Move`: Removes the job from the original queue, and places it on another.
- `Honeydew.FailureMode.Retry`: Re-attempts the job on its original queue a number of times, then calls another failure mode after the final failure.

See `Honeydew.queue_spec/2` to select a failure mode.

### Success Modes
When a job completes successfully, the monitoring process runs the `handle_success/2` function from the selected module on the queue's node. You'll likely want to use this callback for monitoring purposes. You can use a job's `:enqueued_at`, `:started_at` and `:completed_at` fields to calculate various time intervals.

See `Honeydew.queue_spec/2` to select a success mode.

## The Dungeon

### Job Lifecycle
Expand Down
17 changes: 14 additions & 3 deletions lib/honeydew.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@ defmodule Honeydew do
def async(task, queue, opts \\ [])
def async(task, queue, reply: true) do
{:ok, job} =
%Job{task: task, from: {self(), make_ref()}, queue: queue}
task
|> Job.new(queue)
|> struct(from: {self(), make_ref()})
|> enqueue

job
end

def async(task, queue, _opts) do
{:ok, job} =
%Job{task: task, queue: queue}
task
|> Job.new(queue)
|> enqueue

job
Expand Down Expand Up @@ -141,6 +144,7 @@ defmodule Honeydew do
- `queue`: is the module that queue will use, you may also provide init/1 args: {module, args}
- `dispatcher`: the job dispatching strategy, `{module, init_args}`.
- `failure_mode`: the way that failed jobs should be handled. You can pass either a module, or {module, args}, the module must implement the `Honeydew.FailureMode` behaviour. `args` defaults to `[]`.
- `success_mode`: a callback that runs when a job successfully completes. You can pass either a module, or {module, args}, the module must implement the `Honeydew.SuccessMode` behaviour, `args` defaults to `[]``.
- `supervisor_opts`: options accepted by `Supervisor.Spec.supervisor/3`.
For example:
Expand Down Expand Up @@ -168,8 +172,15 @@ defmodule Honeydew do
failure_mode =
case opts[:failure_mode] do
nil -> {Honeydew.FailureMode.Abandon, []}
{module, args} -> {module, args}
module when is_atom(module) -> {module, []}
end

success_mode =
case opts[:success_mode] do
nil -> nil
{module, args} -> {module, args}
module when is_atom(module) -> {module, []}
end

supervisor_opts =
Expand All @@ -181,7 +192,7 @@ defmodule Honeydew do

Supervisor.Spec.supervisor(
Honeydew.QueueSupervisor,
[name, module, args, num, dispatcher, failure_mode],
[name, module, args, num, dispatcher, failure_mode, success_mode],
supervisor_opts)
end

Expand Down
2 changes: 1 addition & 1 deletion lib/honeydew/failure_mode.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule Honeydew.FailureMode do
alias Honeydew.Job

@callback handle_failure(pool :: atom, job :: %Job{}, queue :: atom, args :: list) :: any
@callback handle_failure(job :: %Job{}, reason :: any, args :: list) :: any
end
2 changes: 1 addition & 1 deletion lib/honeydew/failure_mode/abandon.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Honeydew.FailureMode.Abandon do
require Logger
alias Honeydew.Job

# @behaviour Honeydew.FailureMode
@behaviour Honeydew.FailureMode

def handle_failure(%Job{queue: queue, from: from} = job, reason, []) do
Logger.warn "Job failed because #{inspect reason}, abandoning: #{inspect job}"
Expand Down
2 changes: 1 addition & 1 deletion lib/honeydew/failure_mode/move.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Honeydew.FailureMode.Move do
require Logger
alias Honeydew.Job

# @behaviour Honeydew.FailureMode
@behaviour Honeydew.FailureMode

def handle_failure(%Job{queue: queue, from: from} = job, reason, [queue: to_queue]) do
Logger.info "Job failed because #{inspect reason}, moving to #{inspect to_queue}: #{inspect job}"
Expand Down
2 changes: 1 addition & 1 deletion lib/honeydew/failure_mode/retry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Honeydew.FailureMode.Retry do
alias Honeydew.Job
alias Honeydew.FailureMode.Abandon

# @behaviour Honeydew.FailureMode
@behaviour Honeydew.FailureMode

def handle_failure(job, reason, [times: times]), do:
handle_failure(job, reason, [times: times, finally: {Abandon, []}])
Expand Down
9 changes: 8 additions & 1 deletion lib/honeydew/job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ defmodule Honeydew.Job do
:result,
:by, # node last processed the job
:queue,
:monitor]
:monitor,
:enqueued_at,
:started_at,
:completed_at]

@kv Enum.map(@fields, &{&1, nil})

Expand All @@ -22,6 +25,10 @@ defmodule Honeydew.Job do
vars = @fields |> Enum.map(&Macro.var(&1, __MODULE__))
vars_keyword_list = Enum.zip(@fields, vars)

def new(task, queue) do
%__MODULE__{task: task, queue: queue, enqueued_at: :erlang.system_time(:millisecond)}
end

def to_record(%{unquote_splicing(vars_keyword_list)}, name) do
{name, unquote_splicing(vars)}
end
Expand Down
18 changes: 12 additions & 6 deletions lib/honeydew/monitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,40 @@ defmodule Honeydew.Monitor do
@claim_delay 5_000 # ms

defmodule State do
defstruct [:queue, :worker, :job, :failure_mode]
defstruct [:queue, :worker, :job, :failure_mode, :success_mode]
end

def start(job, queue, failure_mode) do
GenServer.start(__MODULE__, [job, queue, failure_mode])
def start(job, queue, failure_mode, success_mode) do
GenServer.start(__MODULE__, [job, queue, failure_mode, success_mode])
end

def init([job, queue, failure_mode]) do
def init([job, queue, failure_mode, success_mode]) do
Process.send_after(self(), :return_job, @claim_delay)

{:ok, %State{job: job, queue: queue, failure_mode: failure_mode}}
{:ok, %State{job: job, queue: queue, failure_mode: failure_mode, success_mode: success_mode}}
end

def handle_call({:claim, job}, {worker, _ref}, state) do
Honeydew.debug "[Honeydew] Monitor #{inspect self()} had job #{inspect job.private} claimed by worker #{inspect worker}"
Process.monitor(worker)
job = %{job | started_at: :erlang.system_time(:millisecond)}
{:reply, :ok, %{state | job: job, worker: worker}}
end

def handle_call(:current_job, _from, %State{job: job, worker: worker} = state) do
{:reply, {worker, job}, state}
end

def handle_call(:ack, {worker, _ref}, %State{job: job, queue: queue, worker: worker} = state) do
def handle_call(:ack, {worker, _ref}, %State{job: job, queue: queue, worker: worker, success_mode: success_mode} = state) do
job = %{job | completed_at: :erlang.system_time(:millisecond)}

queue
|> Honeydew.get_queue
|> GenServer.cast({:ack, job})

with {success_mode_module, success_mode_args} <- success_mode,
do: success_mode_module.handle_success(job, success_mode_args)

{:stop, :normal, :ok, %{state | job: nil}}
end

Expand Down
33 changes: 22 additions & 11 deletions lib/honeydew/queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Honeydew.Queue do
dispatcher: nil,
suspended: false,
failure_mode: nil,
success_mode: nil,
monitors: MapSet.new
end

Expand All @@ -19,15 +20,15 @@ defmodule Honeydew.Queue do
# @behaviour Honeydew.Queue
@before_compile unquote(__MODULE__)

def start_link(queue, args, dispatcher, failure_mode) do
GenServer.start_link(__MODULE__, {queue, args, dispatcher, failure_mode})
def start_link(queue, args, dispatcher, failure_mode, success_mode) do
GenServer.start_link(__MODULE__, {queue, args, dispatcher, failure_mode, success_mode})
end

#
# the queue module also has an init/1 that's called with a list, so we use a tuple
# to ensure we match this one.
#
def init({queue, args, {dispatcher, dispatcher_args}, failure_mode}) do
def init({queue, args, {dispatcher, dispatcher_args}, failure_mode, success_mode}) do
queue
|> Honeydew.group(:queues)
|> :pg2.join(self())
Expand All @@ -43,20 +44,24 @@ defmodule Honeydew.Queue do

{:ok, dispatcher_private} = :erlang.apply(dispatcher, :init, dispatcher_args)

{:ok, %State{queue: queue, private: state, failure_mode: failure_mode, dispatcher: {dispatcher, dispatcher_private}}}
{:ok, %State{queue: queue,
private: state,
failure_mode: failure_mode,
success_mode: success_mode,
dispatcher: {dispatcher, dispatcher_private}}}
end

#
# Enqueue/Reserve
#

def handle_call({:enqueue, job}, _from, %State{suspended: true} = state) do
{state, job} = enqueue(state, job)
{state, job} = do_enqueue(job, state)
{:reply, {:ok, job}, state}
end

def handle_call({:enqueue, job}, _from, state) do
{state, job} = enqueue(state, job)
{state, job} = do_enqueue(job, state)
{:reply, {:ok, job}, dispatch(state)}
end

Expand All @@ -81,12 +86,12 @@ defmodule Honeydew.Queue do

def handle_cast({:ack, job}, state) do
Honeydew.debug "[Honeydew] Job #{inspect job.private} acked in #{inspect self()}"
{:noreply, state |> ack(job)}
{:noreply, ack(job, state)}
end

def handle_cast({:nack, job}, state) do
Honeydew.debug "[Honeydew] Job #{inspect job.private} nacked by #{inspect self()}"
{:noreply, state |> nack(job) |> dispatch}
{:noreply, job |> nack(state) |> dispatch}
end

#
Expand Down Expand Up @@ -129,7 +134,7 @@ defmodule Honeydew.Queue do
end

def handle_call({:cancel, job}, _from, %State{private: queue} = state) do
{reply, queue} = cancel(queue, job)
{reply, queue} = cancel(job, queue)
{:reply, reply, %{state | private: queue}}
end

Expand Down Expand Up @@ -178,12 +183,18 @@ defmodule Honeydew.Queue do
{:noreply, state}
end

defp do_enqueue(job, state) do
job
|> struct(enqueued_at: :erlang.system_time(:millisecond))
|> enqueue(state)
end

defp subscribe_workers(workers) do
Enum.each(workers, &GenServer.cast(&1, :subscribe_to_queues))
end

defp send_job(worker, job, %State{queue: queue, failure_mode: failure_mode, monitors: monitors} = state) do
{:ok, monitor} = Monitor.start(job, queue, failure_mode)
defp send_job(worker, job, %State{queue: queue, failure_mode: failure_mode, success_mode: success_mode, monitors: monitors} = state) do
{:ok, monitor} = Monitor.start(job, queue, failure_mode, success_mode)
Process.monitor(monitor)
GenServer.cast(worker, {:run, %{job | monitor: monitor}})
%{state | monitors: MapSet.put(monitors, monitor)}
Expand Down
8 changes: 4 additions & 4 deletions lib/honeydew/queue/erlang_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Honeydew.Queue.ErlangQueue do
# Enqueue/Reservee
#

def enqueue(%State{private: {pending, in_progress}} = state, job) do
def enqueue(job, %State{private: {pending, in_progress}} = state) do
job = %{job | private: :erlang.unique_integer}
{%{state | private: {:queue.in(job, pending), in_progress}}, job}
end
Expand All @@ -30,11 +30,11 @@ defmodule Honeydew.Queue.ErlangQueue do
# Ack/Nack
#

def ack(%State{private: {pending, in_progress}} = state, %Job{private: id}) do
def ack(%Job{private: id}, %State{private: {pending, in_progress}} = state) do
%{state | private: {pending, Map.delete(in_progress, id)}}
end

def nack(%State{private: {pending, in_progress}} = state, %Job{private: id} = job) do
def nack(%Job{private: id} = job, %State{private: {pending, in_progress}} = state) do
%{state | private: {:queue.in_r(job, pending), Map.delete(in_progress, id)}}
end

Expand All @@ -52,7 +52,7 @@ defmodule Honeydew.Queue.ErlangQueue do
(in_progress |> Map.values |> Enum.filter(function))
end

def cancel({pending, in_progress}, %Job{private: private}) do
def cancel(%Job{private: private}, {pending, in_progress}) do
filter = fn
%Job{private: ^private} -> false;
_ -> true
Expand Down
10 changes: 5 additions & 5 deletions lib/honeydew/queue/mnesia.ex
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ defmodule Honeydew.Queue.Mnesia do
# Enqueue/Reserve
#

def enqueue(%State{private: %PState{table: table, access_context: access_context}} = state, job) do
def enqueue(job, %State{private: %PState{table: table, access_context: access_context}} = state) do
job = %{job | private: {false, :erlang.unique_integer([:monotonic])}} # {in_progress, :id}

:mnesia.activity(access_context, fn ->
Expand Down Expand Up @@ -81,14 +81,14 @@ defmodule Honeydew.Queue.Mnesia do
# Ack/Nack
#

def ack(%State{private: %PState{table: table}} = state, %Job{private: private}) do
def ack(%Job{private: private}, %State{private: %PState{table: table}} = state) do
:ok = :mnesia.dirty_delete(table, private)

state
end

def nack(%State{private: %PState{table: table, access_context: access_context}} = state, %Job{private: {_, id},
failure_private: failure_private} = job) do
def nack(%Job{private: {_, id}, failure_private: failure_private} = job, %State{private: %PState{table: table,
access_context: access_context}} = state) do
:mnesia.activity(access_context, fn ->
:ok = :mnesia.delete({table, {true, id}})

Expand Down Expand Up @@ -152,7 +152,7 @@ defmodule Honeydew.Queue.Mnesia do
end)
end

def cancel(%PState{table: table, access_context: access_context} = queue, %Job{private: {_, id}}) do
def cancel(%Job{private: {_, id}, }, %PState{table: table, access_context: access_context} = queue) do
reply =
:mnesia.activity(access_context, fn ->
Job.job(private: {:_, id}, _: :_)
Expand Down
4 changes: 2 additions & 2 deletions lib/honeydew/queue_supervisor.ex
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
defmodule Honeydew.QueueSupervisor do

def start_link(queue, module, args, num_queues, dispatcher, failure_mode) do
def start_link(queue, module, args, num_queues, dispatcher, failure_mode, success_mode) do
import Supervisor.Spec

children = [
worker(module, [queue, args, dispatcher, failure_mode])
worker(module, [queue, args, dispatcher, failure_mode, success_mode])
]

opts = [strategy: :simple_one_for_one,
Expand Down
5 changes: 5 additions & 0 deletions lib/honeydew/success_mode.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
defmodule Honeydew.SuccessMode do
alias Honeydew.Job

@callback handle_success(job :: %Job{}, args :: list) :: any
end
11 changes: 11 additions & 0 deletions lib/honeydew/success_mode/log.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defmodule Honeydew.SuccessMode.Log do
require Logger
alias Honeydew.Job
@behaviour Honeydew.SuccessMode

def handle_success(%Job{enqueued_at: enqueued_at, started_at: started_at, completed_at: completed_at} = job, []) do
queue_time = started_at - enqueued_at
run_time = completed_at - started_at
Logger.info "Job #{inspect job} completed, sat in queue for #{queue_time}ms, and took #{run_time}ms to complete."
end
end
2 changes: 1 addition & 1 deletion test/honeydew/queue_supervisor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Honeydew.QueueSupervisorTest do

Honeydew.create_groups(pool)

{:ok, supervisor} = Honeydew.QueueSupervisor.start_link(pool, ErlangQueue, [], 3, {Honeydew.Dispatcher.LRU, []}, {Honeydew.FailureMode.Abandon, []})
{:ok, supervisor} = Honeydew.QueueSupervisor.start_link(pool, ErlangQueue, [], 3, {Honeydew.Dispatcher.LRU, []}, {Honeydew.FailureMode.Abandon, []}, nil)

# on_exit fn ->
# Supervisor.stop(supervisor)
Expand Down
Loading

1 comment on commit a8d6ad4

@aaronrenner
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

馃帀 鉂わ笍 馃帀 You are really quick at cranking this stuff out! I can't wait to try it. 馃殌

Please sign in to comment.