-
-
Notifications
You must be signed in to change notification settings - Fork 293
/
peer.ex
119 lines (85 loc) · 3.21 KB
/
peer.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
defmodule Oban.Peer do
@moduledoc """
The `Peer` module maintains leadership for a particular Oban instance within a cluster.
Leadership is used by plugins, primarily, to prevent duplicate work accross nodes. For example,
only the leader's `Cron` plugin will try inserting new jobs. You can use peer leadership to
extend Oban with custom plugins, or even within your own application.
Note a few important details about how peer leadership operates:
* Each peer checks for leadership at a 30 second interval. When the leader exits it broadcasts a
message to all other peers to encourage another one to assume leadership.
* Each Oban instances supervises a distinct `Oban.Peer` instance. That means that with multiple
Oban instances on the same node one instance may be the leader, while the others aren't.
* Without leadership, global plugins (Cron, Lifeline, Stager, etc.), will not run on any node.
## Available Peer Implementations
There are two built-in peering modules:
* `Oban.Peers.Postgres` — uses table-based leadership through the `oban_peers` table and works
in any environment, with or without clustering. Only one node (per instance name) will have a
row in the peers table, that node is the leader. This is the default.
* `Oban.Peers.Global` — coordinates global locks through distributed Erlang, requires
distributed Erlang.
## Examples
Check leadership for the default Oban instance:
Oban.Peer.leader?()
# => true
That is identical to using the name `Oban`:
Oban.Peer.leader?(Oban)
# => true
Check leadership for a couple of instances:
Oban.Peer.leader?(Oban.A)
# => true
Oban.Peer.leader?(Oban.B)
# => false
"""
alias Oban.{Config, Registry}
require Logger
@type option ::
{:name, module()}
| {:conf, Config.t()}
| {:interval, timeout()}
@doc """
Starts a peer instance.
"""
@callback start_link([option()]) :: GenServer.on_start()
@doc """
Check whether the current peer instance leads the cluster.
"""
@callback leader?(pid()) :: boolean()
@doc """
Check whether the current instance leads the cluster.
## Example
Check leadership for the default Oban instance:
Oban.Peer.leader?()
# => true
Check leadership for an alternate instance named `Oban.Private`:
Oban.Peer.leader?(Oban.Private)
# => true
"""
@spec leader?(Config.t() | GenServer.server()) :: boolean()
def leader?(conf_or_name \\ Oban, timeout \\ 5_000)
def leader?(%Config{} = conf, timeout) do
case Registry.whereis(conf.name, Oban.Peer) do
pid when is_pid(pid) ->
conf.peer.leader?(pid, timeout)
nil ->
false
end
catch
:exit, {:timeout, _} = reason ->
Logger.warn("""
Oban.Peer.leader?/2 check failed due to #{inspect(reason)}.
""")
false
end
def leader?(name, timeout) do
name
|> Oban.config()
|> leader?(timeout)
end
@doc false
@spec child_spec(Keyword.t()) :: Supervisor.child_spec()
def child_spec(opts) do
conf = Keyword.fetch!(opts, :conf)
opts = Keyword.put_new(opts, :name, conf.peer)
%{id: opts[:name], start: {conf.peer, :start_link, [opts]}}
end
end