Skip to content

Commit

Permalink
Store PG notifier state in the registry
Browse files Browse the repository at this point in the history
To avoid `GenServer.call` timeouts when the system is under high load we
pull the state from the notifier's registry metadata.
  • Loading branch information
sorentwo committed May 17, 2023
1 parent e1d1db0 commit 0760e99
Showing 1 changed file with 19 additions and 37 deletions.
56 changes: 19 additions & 37 deletions lib/oban/notifiers/pg.ex
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ defmodule Oban.Notifiers.PG do

@impl Notifier
def notify(server, channel, payload) do
with %State{} = state <- GenServer.call(server, :get_state),
[_ | _] = pids <- members(state.conf.prefix) do
with %State{conf: conf} <- get_state(server) do
pids = :pg.get_members(__MODULE__, conf.prefix)

Check warning on line 85 in lib/oban/notifiers/pg.ex

View workflow job for this annotation

GitHub Actions / ci (1.12, 22.3, 12.13-alpine, lite)

:pg.get_members/2 is undefined (module :pg is not available or is yet to be defined)

for pid <- pids, message <- payload_to_messages(channel, payload) do
send(pid, message)
end
Expand All @@ -95,13 +96,27 @@ defmodule Oban.Notifiers.PG do
def init(opts) do
state = struct!(State, opts)

start_pg()
put_state(state)

:ok = join(state.conf.prefix)
:pg.start_link(__MODULE__)

Check warning on line 101 in lib/oban/notifiers/pg.ex

View workflow job for this annotation

GitHub Actions / ci (1.12, 22.3, 12.13-alpine, lite)

:pg.start_link/1 is undefined (module :pg is not available or is yet to be defined)
:pg.join(__MODULE__, state.conf.prefix, self())

Check warning on line 102 in lib/oban/notifiers/pg.ex

View workflow job for this annotation

GitHub Actions / ci (1.12, 22.3, 12.13-alpine, lite)

:pg.join/3 is undefined (module :pg is not available or is yet to be defined)

{:ok, state}
end

defp put_state(state) do
Registry.update_value(Oban.Registry, {state.conf.name, Oban.Notifier}, fn _ -> state end)
end

defp get_state(server) do
[name] = Registry.keys(Oban.Registry, server)

case Oban.Registry.lookup(name) do
{_pid, state} -> state
nil -> :error
end
end

@impl GenServer
def handle_call({:listen, channels}, {pid, _}, %State{listeners: listeners} = state) do
if Map.has_key?(listeners, pid) do
Expand All @@ -125,8 +140,6 @@ defmodule Oban.Notifiers.PG do
{:reply, :ok, %{state | listeners: listeners}}
end

def handle_call(:get_state, _from, state), do: {:reply, state, state}

@impl GenServer
def handle_info({:notification, channel, payload}, %State{} = state) do
listeners = for {pid, channels} <- state.listeners, channel in channels, do: pid
Expand All @@ -140,37 +153,6 @@ defmodule Oban.Notifiers.PG do
{:noreply, state}
end

## PG Helpers

if Code.ensure_loaded?(:pg) do
defp start_pg do
:pg.start_link(__MODULE__)
end

defp members(prefix) do
:pg.get_members(__MODULE__, prefix)
end

defp join(prefix) do
:ok = :pg.join(__MODULE__, prefix, self())
end
else
defp start_pg, do: :ok

defp members(prefix) do
:pg2.get_members(namespace(prefix))
end

defp join(prefix) do
namespace = namespace(prefix)

:ok = :pg2.create(namespace)
:ok = :pg2.join(namespace, self())
end

defp namespace(prefix), do: {:oban, prefix}
end

## Message Helpers

defp payload_to_messages(channel, payload) do
Expand Down

0 comments on commit 0760e99

Please sign in to comment.