-
-
Notifications
You must be signed in to change notification settings - Fork 309
/
global.ex
95 lines (69 loc) · 2.15 KB
/
global.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
defmodule Oban.Peers.Global do
@moduledoc """
A cluster based peer that coordinates through a distributed registry.
Leadership is coordinated through global locks. It requires a functional distributed Erlang
cluster, without one global plugins (Cron, Lifeline, Stager, etc.) will not function correctly.
## Usage
Specify the `Global` peer in your Oban configuration.
config :my_app, Oban,
peer: Oban.Peers.Global,
...
"""
@behaviour Oban.Peer
use GenServer
alias Oban.Backoff
defmodule State do
@moduledoc false
defstruct [
:conf,
:name,
:timer,
interval: :timer.seconds(30),
leader?: false
]
end
@impl Oban.Peer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: opts[:name])
end
@impl Oban.Peer
def leader?(pid, timeout \\ 5_000) do
GenServer.call(pid, :leader?, timeout)
end
@impl GenServer
def init(opts) do
Process.flag(:trap_exit, true)
{:ok, struct!(State, opts), {:continue, :start}}
end
@impl GenServer
def terminate(_reason, %State{timer: timer} = state) do
if is_reference(timer), do: Process.cancel_timer(timer)
if state.leader?, do: :global.del_lock(key(state), nodes())
:ok
end
@impl GenServer
def handle_continue(:start, %State{} = state) do
handle_info(:election, state)
end
@impl GenServer
def handle_call(:leader?, _from, %State{} = state) do
{:reply, state.leader?, state}
end
@impl GenServer
def handle_info(:election, %State{} = state) do
meta = %{conf: state.conf, leader: state.leader?, peer: __MODULE__}
locked? =
:telemetry.span([:oban, :peer, :election], meta, fn ->
locked? = :global.set_lock(key(state), nodes(), 0)
{locked?, %{meta | leader: locked?}}
end)
{:noreply, schedule_election(%{state | leader?: locked?})}
end
defp schedule_election(%State{interval: interval} = state) do
time = Backoff.jitter(interval, mode: :dec)
%{state | timer: Process.send_after(self(), :election, time)}
end
# Helpers
defp key(state), do: {state.conf.name, state.conf.node}
defp nodes, do: [Node.self() | Node.list()]
end