Skip to content

Commit

Permalink
Support alternative namespacing in PG notifier
Browse files Browse the repository at this point in the history
By default, all Oban instances using the same `prefix` option would
receive notifications from each other. Now you can use the `namespace`
option to separate instances that are in the same cluster _without_
changing the `prefix`.

Addresses #1065
  • Loading branch information
sorentwo committed Apr 10, 2024
1 parent 8755236 commit 98e4b39
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 4 deletions.
23 changes: 19 additions & 4 deletions lib/oban/notifiers/pg.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@ defmodule Oban.Notifiers.PG do
...
```
By default, all Oban instances using the same `prefix` option will receive notifications from
each other. You can use the `namespace` option to separate instances that are in the same
cluster _without_ changing the prefix:
```elixir
config :my_app, Oban,
notifier: {Oban.Notifiers.PG, namespace: :custom_namespace}
...
```
The namespace may be any term.
[pg]: https://www.erlang.org/doc/man/pg.html
[de]: https://elixir-lang.org/getting-started/mix-otp/distributed-tasks.html#our-first-distributed-code
"""
Expand All @@ -28,12 +40,15 @@ defmodule Oban.Notifiers.PG do

alias Oban.Notifier

defstruct [:conf, listeners: %{}]
defstruct [:conf, :namespace, listeners: %{}]

@impl Notifier
def start_link(opts) do
{name, opts} = Keyword.pop(opts, :name)

conf = Keyword.fetch!(opts, :conf)
opts = Keyword.put_new(opts, :namespace, conf.prefix)

GenServer.start_link(__MODULE__, struct!(__MODULE__, opts), name: name)
end

Expand All @@ -49,8 +64,8 @@ defmodule Oban.Notifiers.PG do

@impl Notifier
def notify(server, channel, payload) do
with %{conf: conf} <- get_state(server) do
pids = :pg.get_members(__MODULE__, conf.prefix)
with %{namespace: namespace} <- get_state(server) do
pids = :pg.get_members(__MODULE__, namespace)

for pid <- pids, message <- payload_to_messages(channel, payload) do
send(pid, message)
Expand All @@ -65,7 +80,7 @@ defmodule Oban.Notifiers.PG do
put_state(state)

:pg.start_link(__MODULE__)
:pg.join(__MODULE__, state.conf.prefix, self())
:pg.join(__MODULE__, state.namespace, self())

{:ok, state}
end
Expand Down
28 changes: 28 additions & 0 deletions test/oban/notifiers/pg_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
defmodule Oban.Notifiers.PGTest do
use Oban.Case, async: true

alias Oban.Notifier
alias Oban.Notifiers.PG

describe "namespacing" do
test "namespacing by configured prefix without an override" do
name_1 = start_supervised_oban!(notifier: PG, prefix: "pg_test")
name_2 = start_supervised_oban!(notifier: PG, prefix: "pg_test")

:ok = Notifier.listen(name_1, :signal)
:ok = Notifier.notify(name_2, :signal, %{incoming: "message"})

assert_receive {:notification, :signal, %{"incoming" => "message"}}
end

test "overriding the default namespace" do
name_1 = start_supervised_oban!(notifier: {PG, namespace: :pg_test}, prefix: "pg_a")
name_2 = start_supervised_oban!(notifier: {PG, namespace: :pg_test}, prefix: "pg_b")

:ok = Notifier.listen(name_1, :signal)
:ok = Notifier.notify(name_2, :signal, %{incoming: "message"})

assert_receive {:notification, :signal, %{"incoming" => "message"}}
end
end
end

0 comments on commit 98e4b39

Please sign in to comment.