-
-
Notifications
You must be signed in to change notification settings - Fork 293
/
gossip.ex
133 lines (94 loc) · 3.46 KB
/
gossip.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
defmodule Oban.Plugins.Gossip do
@moduledoc """
The Gossip plugin uses PubSub to periodically exchange queue state information between all
interested nodes. This allows Oban instances to broadcast state information regardless of which
engine they are using, and without storing anything in the database.
Gossip enables real-time updates across an entire cluster, and is essential to the operation of
UIs like Oban Web.
The Gossip plugin entirely replaced heartbeats and the legacy `oban_beats` table.
## Using the Plugin
The following example demonstrates using the plugin without any configuration, which will broadcast
the state of each local queue every 1 second:
config :my_app, Oban,
plugins: [Oban.Plugins.Gossip],
...
Override the default options to broadcast every 5 seconds:
config :my_app, Oban,
plugins: [{Oban.Plugins.Gossip, interval: :timer.seconds(5)}],
...
## Options
* `:interval` — the number of milliseconds between gossip broadcasts
## Instrumenting with Telemetry
The `Oban.Plugins.Gossip` plugin adds the following metadata to the `[:oban, :plugin, :stop]` event:
* `:gossip_count` - the number of queues that had activity broadcasted
"""
@behaviour Oban.Plugin
use GenServer
alias Oban.{Notifier, Plugin, Validation}
@type option :: Plugin.option() | {:interval, pos_integer()}
defmodule State do
@moduledoc false
defstruct [:conf, :name, :timer, interval: :timer.seconds(1)]
end
@impl Plugin
@spec start_link([option()]) :: GenServer.on_start()
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: opts[:name])
end
@impl Plugin
def validate(opts) do
Validation.validate(opts, fn
{:conf, _} -> :ok
{:name, _} -> :ok
{:interval, interval} -> Validation.validate_integer(:interval, interval)
option -> {:unknown, option, State}
end)
end
@impl GenServer
def init(opts) do
Validation.validate!(opts, &validate/1)
Process.flag(:trap_exit, true)
state =
State
|> struct!(opts)
|> schedule_gossip()
:telemetry.execute([:oban, :plugin, :init], %{}, %{conf: state.conf, plugin: __MODULE__})
{:ok, state}
end
@impl GenServer
def terminate(_reason, state) do
if is_reference(state.timer), do: Process.cancel_timer(state.timer)
:ok
end
@impl GenServer
def handle_info(:gossip, %State{} = state) do
meta = %{conf: state.conf, plugin: __MODULE__}
match = [{{{state.conf.name, {:producer, :_}}, :"$1", :_}, [], [:"$1"]}]
:telemetry.span([:oban, :plugin], meta, fn ->
checks =
Oban.Registry
|> Registry.select(match)
|> Enum.map(&safe_check(&1, state))
|> Enum.reject(&is_nil/1)
|> Enum.map(&sanitize_name/1)
if Enum.any?(checks), do: Notifier.notify(state.conf, :gossip, checks)
{:ok, Map.put(meta, :gossip_count, length(checks))}
end)
{:noreply, schedule_gossip(state)}
end
def handle_info(_message, state) do
{:noreply, state}
end
# Scheduling
defp schedule_gossip(state) do
%{state | timer: Process.send_after(self(), :gossip, state.interval)}
end
# Checking
defp safe_check(pid, state) do
if Process.alive?(pid), do: GenServer.call(pid, :check, state.interval)
catch
:exit, _ -> nil
end
defp sanitize_name(%{name: name} = check) when is_binary(name), do: check
defp sanitize_name(%{name: name} = check), do: %{check | name: inspect(name)}
end