Skip to content

Commit

Permalink
more refactoring and code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
msimonborg committed Apr 3, 2022
1 parent 1204fc3 commit 94a950b
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 178 deletions.
14 changes: 6 additions & 8 deletions lib/odd_job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ defmodule OddJob do
def async_perform(pool, function) when is_atom(pool) and is_function(function, 0) do
case pool |> queue_name() do
{:error, :not_found} -> {:error, :not_found}
_ -> Async.perform(pool, function)
queue -> Async.perform(pool, queue, function)
end
end

Expand Down Expand Up @@ -339,7 +339,7 @@ defmodule OddJob do
when is_atom(pool) and is_enumerable(collection) and is_function(function, 1) do
case pool |> queue_name() do
{:error, :not_found} -> {:error, :not_found}
_ -> Async.perform_many(pool, collection, function)
queue -> Async.perform_many(pool, queue, collection, function)
end
end

Expand Down Expand Up @@ -367,9 +367,8 @@ defmodule OddJob do
"""
@doc since: "0.1.0"
@spec await(job, timeout) :: term
def await(job, timeout \\ 5000) when is_struct(job, Job) do
Async.await(job, timeout)
end
def await(job, timeout \\ 5000) when is_struct(job, Job),
do: Async.await(job, timeout)

@doc """
Awaits replies from multiple async jobs and returns them in a list.
Expand Down Expand Up @@ -398,9 +397,8 @@ defmodule OddJob do
"""
@doc since: "0.2.0"
@spec await_many([job], timeout) :: [term]
def await_many(jobs, timeout \\ 5000) when is_list(jobs) do
Async.await_many(jobs, timeout)
end
def await_many(jobs, timeout \\ 5000) when is_list(jobs),
do: Async.await_many(jobs, timeout)

@doc """
Sends a job to the `pool` after the given `timer` has elapsed.
Expand Down
4 changes: 1 addition & 3 deletions lib/odd_job/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ defmodule OddJob.Application do
end

defp extra_pools do
extra_pools = Application.get_env(:odd_job, :extra_pools, [])

for pool <- extra_pools do
for pool <- Application.get_env(:odd_job, :extra_pools, []) do
start_arg =
case pool do
{name, opts} when is_atom(name) and is_list(opts) ->
Expand Down
47 changes: 15 additions & 32 deletions lib/odd_job/async.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,30 @@ defmodule OddJob.Async do
@moduledoc false
@moduledoc since: "0.1.0"

alias OddJob.{Async.ProxySupervisor, Job, Utils}
alias OddJob.{Async.ProxySupervisor, Job, Queue, Utils}

@type job :: OddJob.Job.t()
@type job :: Job.t()
@type queue :: Queue.queue()

@doc since: "0.1.0"
@spec perform(atom, fun) :: job
def perform(pool, fun) when is_atom(pool) and is_function(fun) do
@spec perform(atom, queue, fun) :: job
def perform(pool, queue, fun) when is_atom(pool) and is_function(fun) do
pool
|> ProxySupervisor.start_child()
|> Utils.link_and_monitor()
|> build_job(fun)
|> run_proxy_with_job(pool)
|> tap(fn job -> Queue.perform(queue, job) end)
end

@spec run_proxy_with_job(job, atom) :: job
defp run_proxy_with_job(job, pool) do
GenServer.call(job.proxy, {:run, pool, job})
end

@spec perform_many(atom, list | map, function) :: [job]
def perform_many(pool, collection, fun) do
jobs =
for item <- collection do
pool
|> ProxySupervisor.start_child()
|> Utils.link_and_monitor()
|> build_job(fn -> fun.(item) end)
|> send_job_to_proxy()
end

pool
|> Utils.queue_name()
|> GenServer.cast({:perform_many, jobs})

jobs
@spec perform_many(atom, queue, list | map, function) :: [job]
def perform_many(pool, queue, collection, fun) do
for item <- collection do
pool
|> ProxySupervisor.start_child()
|> Utils.link_and_monitor()
|> build_job(fn -> fun.(item) end)
end
|> tap(fn jobs -> Queue.perform_many(queue, jobs) end)
end

@spec build_job({pid, reference}, function) :: job
Expand All @@ -50,12 +39,6 @@ defmodule OddJob.Async do
}
end

@spec send_job_to_proxy(job) :: job
defp send_job_to_proxy(job) do
GenServer.cast(job.proxy, {:job, job})
job
end

# The rest of this module, covering implementation of the `await` and `await_many` functions,
# was copied and adapted from the analogous functions in the Elixir standard library's `Task` module.
# The intention of these functions is to mimic the expected behavior of the `Task` functions.
Expand Down
42 changes: 21 additions & 21 deletions lib/odd_job/async/proxy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ defmodule OddJob.Async.Proxy do
@doc false
use GenServer, restart: :temporary

alias OddJob.Utils

defstruct [:worker_ref, :job]

@type t :: %__MODULE__{
Expand All @@ -28,12 +26,26 @@ defmodule OddJob.Async.Proxy do
}

@type job :: OddJob.Job.t()
@type proxy :: pid

# <---- Client API ---->

@doc false
@spec start_link([]) :: :ignore | {:error, any} | {:ok, pid}
def start_link([]) do
GenServer.start_link(__MODULE__, [])
end
def start_link([]),
do: GenServer.start_link(__MODULE__, [])

@doc false
@spec link_and_monitor_caller(proxy) :: {:ok, reference}
def link_and_monitor_caller(proxy),
do: GenServer.call(proxy, :link_and_monitor_caller)

@doc false
@spec report_completed_job(proxy, job) :: :ok
def report_completed_job(proxy, job),
do: GenServer.call(proxy, {:job_complete, job})

# <---- Callbacks ---->

@impl GenServer
@spec init(any) :: {:ok, any}
Expand All @@ -42,33 +54,21 @@ defmodule OddJob.Async.Proxy do
end

@impl GenServer
def handle_cast({:job, job}, state), do: {:noreply, %{state | job: job}}

@impl GenServer
def handle_call({:run, pool, job}, _from, state) do
pool
|> Utils.queue_name()
|> GenServer.cast({:perform, job})

{:reply, job, %{state | job: job}}
end

def handle_call(:link_and_monitor, {from, _}, state) do
def handle_call(:link_and_monitor_caller, {from, _}, state) do
Process.link(from)
ref = Process.monitor(from)
{:reply, :ok, %{state | worker_ref: ref}}
{:reply, {:ok, ref}, %{state | worker_ref: ref}}
end

def handle_call({:complete, job}, {from, _}, %{worker_ref: ref} = state) do
def handle_call({:job_complete, job}, {from, _}, %{worker_ref: ref} = state) do
Process.unlink(from)
Process.demonitor(ref, [:flush])
Process.send(job.owner, {job.ref, job.results}, [])
{:stop, :normal, :ok, state}
end

@impl GenServer
def handle_info({:DOWN, ref, :process, _pid, reason}, %{worker_ref: worker_ref} = state)
when ref == worker_ref do
def handle_info({:DOWN, _, _, _, reason}, state) do
{:stop, reason, state}
end
end
56 changes: 28 additions & 28 deletions lib/odd_job/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,34 +35,6 @@ defmodule OddJob.Pool do
| {:max_restarts, non_neg_integer}
| {:max_seconds, non_neg_integer}

@doc false
@spec child_spec(options) :: child_spec
def child_spec(opts) when is_list(opts) do
[name: __MODULE__]
|> Keyword.merge(opts)
|> super()
|> Supervisor.child_spec(id: opts[:name])
end

@doc false
@spec start_link(options) :: Supervisor.on_start()
def start_link(opts) when is_list(opts) do
{name, init_opts} = Keyword.pop!(opts, :name)
Supervisor.start_link(__MODULE__, [name, init_opts], name: name)
end

@impl Supervisor
def init([name, _opts] = args) do
children = [
{OddJob.Async.ProxySupervisor, name},
{OddJob.Scheduler.Supervisor, name},
{OddJob.Queue, name},
{OddJob.Pool.Supervisor, args}
]

Supervisor.init(children, strategy: :one_for_one)
end

@doc false
@doc since: "0.4.0"
defmacro __using__(opts) do
Expand Down Expand Up @@ -105,4 +77,32 @@ defmodule OddJob.Pool do
defoverridable child_spec: 1, start_link: 1
end
end

@doc false
@spec child_spec(options) :: child_spec
def child_spec(opts) when is_list(opts) do
[name: __MODULE__]
|> Keyword.merge(opts)
|> super()
|> Supervisor.child_spec(id: opts[:name])
end

@doc false
@spec start_link(options) :: Supervisor.on_start()
def start_link(opts) when is_list(opts) do
{name, init_opts} = Keyword.pop!(opts, :name)
Supervisor.start_link(__MODULE__, [name, init_opts], name: name)
end

@impl Supervisor
def init([name, _opts] = args) do
children = [
{OddJob.Async.ProxySupervisor, name},
{OddJob.Scheduler.Supervisor, name},
{OddJob.Queue, name},
{OddJob.Pool.Supervisor, args}
]

Supervisor.init(children, strategy: :one_for_one)
end
end
9 changes: 2 additions & 7 deletions lib/odd_job/pool/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,8 @@ defmodule OddJob.Pool.Supervisor do
alias OddJob.Utils

@doc false
def start_link([name, _opts] = args) do
opts = [
name: Utils.pool_supervisor_name(name)
]

Supervisor.start_link(__MODULE__, args, opts)
end
def start_link([name, _opts] = args),
do: Supervisor.start_link(__MODULE__, args, name: Utils.pool_supervisor_name(name))

@impl Supervisor
def init([name, opts]) do
Expand Down
48 changes: 26 additions & 22 deletions lib/odd_job/pool/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule OddJob.Pool.Worker do

use GenServer

alias OddJob.Utils
alias OddJob.{Async.Proxy, Queue}

defstruct [:id, :pool, :queue_pid, :queue_name]

Expand All @@ -22,6 +22,8 @@ defmodule OddJob.Pool.Worker do
queue_name: OddJob.Queue.queue_name()
}

# <---- Client API ---->

@doc false
def child_spec(opts) do
opts
Expand All @@ -34,51 +36,53 @@ defmodule OddJob.Pool.Worker do
GenServer.start_link(__MODULE__, opts)
end

@doc false
def perform(worker, job) do
GenServer.cast(worker, {:perform, job})
end

# <---- Callbacks ---->

@impl GenServer
def init(opts) do
pool = Keyword.fetch!(opts, :pool)

with queue_name = {:via, _, _} <- OddJob.queue_name(pool),
queue_pid when is_pid(queue_pid) <- GenServer.whereis(queue_name) do
state = struct(__MODULE__, opts ++ [queue_pid: queue_pid, queue_name: queue_name])
Process.monitor(queue_pid)
OddJob.Queue.monitor_worker(queue_name, self())
{:ok, state}
Queue.monitor_worker(queue_name, self())
{:ok, struct(__MODULE__, opts ++ [queue_pid: queue_pid, queue_name: queue_name])}
else
_ -> raise RuntimeError, message: "#{inspect(pool)} queue process cannot be found"
end
end

@impl GenServer
def handle_cast({:do_perform, %{async: true, proxy: proxy} = job}, %{pool: pool} = state) do
GenServer.call(proxy, :link_and_monitor)
job = do_perform(pool, job)
GenServer.call(proxy, {:complete, job})
def handle_cast({:perform, %{async: true, proxy: proxy} = job}, state) do
{:ok, _} = Proxy.link_and_monitor_caller(proxy)
job = do_perform(state.queue_name, job)
Proxy.report_completed_job(proxy, job)
{:noreply, state}
end

def handle_cast({:do_perform, job}, %{pool: pool} = state) do
do_perform(pool, job)
def handle_cast({:perform, job}, state) do
do_perform(state.queue_name, job)
{:noreply, state}
end

defp do_perform(pool, job) do
defp do_perform(queue_name, job) do
results = job.function.()

pool
|> Utils.queue_name()
|> GenServer.cast({:complete, self()})

Queue.request_new_job(queue_name, self())
%{job | results: results}
end

@impl GenServer
def handle_info({:DOWN, _, _, pid, _}, %{queue_pid: queue_pid, queue_name: queue_name} = state)
when pid == queue_pid do
new_queue_pid = check_for_new_queue_process(queue_name)
Process.monitor(new_queue_pid)
OddJob.Queue.monitor_worker(queue_name, self())
{:noreply, %{state | queue_pid: new_queue_pid}}
def handle_info({:DOWN, _, _, pid, _}, %{queue_pid: q_pid} = state) when pid == q_pid do
q_name = state.queue_name
new_q_pid = check_for_new_queue_process(q_name)
Process.monitor(new_q_pid)
Queue.monitor_worker(q_name, self())
{:noreply, %{state | queue_pid: new_q_pid}}
end

defp check_for_new_queue_process(queue_name) do
Expand Down
Loading

0 comments on commit 94a950b

Please sign in to comment.