Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ If you're using the default tenant, the URL is `ws://realtime-dev.localhost:4000
| REBALANCE_CHECK_INTERVAL_IN_MS | number | Time in ms to check if process is in the right region |
| DISCONNECT_SOCKET_ON_NO_CHANNELS_INTERVAL_IN_MS | number | Time in ms to check if a socket has no channels open and if so, disconnect it |
| BROADCAST_POOL_SIZE | number | Number of processes to relay Phoenix.PubSub messages across the cluster |

| POSTGRES_CDC_SCOPE_SHARDS | number | Number of dynamic supervisor partitions used by the Postgres CDC extension. Defaults to 5. |
| USERS_SCOPE_SHARDS | number | Number of dynamic supervisor partitions used by the Users extension. Defaults to 5. |
Comment on lines +195 to +196
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I think the description is wrong here


The OpenTelemetry variables mentioned above are not an exhaustive list of all [supported environment variables](https://opentelemetry.io/docs/languages/sdk-configuration/).

Expand Down
2 changes: 2 additions & 0 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ broadcast_pool_size = Env.get_integer("BROADCAST_POOL_SIZE", 10)
pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "gen_rpc") |> String.to_atom()
websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_000), :erlang.system_info(:wordsize))
users_scope_shards = Env.get_integer("USERS_SCOPE_SHARDS", 5)
postgres_cdc_scope_shards = Env.get_integer("POSTGRES_CDC_SCOPE_SHARDS", 5)
Comment on lines 73 to +74
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should document this as part of the README?

regional_broadcasting = Env.get_boolean("REGIONAL_BROADCASTING", false)

no_channel_timeout_in_ms =
Expand Down Expand Up @@ -130,6 +131,7 @@ config :realtime,
pubsub_adapter: pubsub_adapter,
broadcast_pool_size: broadcast_pool_size,
users_scope_shards: users_scope_shards,
postgres_cdc_scope_shards: postgres_cdc_scope_shards,
regional_broadcasting: regional_broadcasting

if config_env() != :test && run_janitor? do
Expand Down
17 changes: 11 additions & 6 deletions lib/extensions/postgres_cdc_rls/cdc_rls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ defmodule Extensions.PostgresCdcRls do

@spec handle_stop(String.t(), non_neg_integer()) :: :ok
def handle_stop(tenant, timeout) when is_binary(tenant) do
case :syn.whereis_name({__MODULE__, tenant}) do
scope = Realtime.Syn.PostgresCdc.scope(tenant)

case :syn.whereis_name({scope, tenant}) do
:undefined ->
Logger.warning("Database supervisor not found for tenant #{tenant}")
:ok
Expand Down Expand Up @@ -124,7 +126,9 @@ defmodule Extensions.PostgresCdcRls do

@spec get_manager_conn(String.t()) :: {:error, nil | :wait} | {:ok, pid(), pid()}
def get_manager_conn(id) do
case :syn.lookup(__MODULE__, id) do
scope = Realtime.Syn.PostgresCdc.scope(id)

case :syn.lookup(scope, id) do
{_, %{manager: nil, subs_pool: nil}} -> {:error, :wait}
{_, %{manager: manager, subs_pool: conn}} -> {:ok, manager, conn}
_ -> {:error, nil}
Expand All @@ -133,12 +137,15 @@ defmodule Extensions.PostgresCdcRls do

@spec supervisor_id(String.t(), String.t()) :: {atom(), String.t(), map()}
def supervisor_id(tenant, region) do
{__MODULE__, tenant, %{region: region, manager: nil, subs_pool: nil}}
scope = Realtime.Syn.PostgresCdc.scope(tenant)
{scope, tenant, %{region: region, manager: nil, subs_pool: nil}}
end

@spec update_meta(String.t(), pid(), pid()) :: {:ok, {pid(), term()}} | {:error, term()}
def update_meta(tenant, manager_pid, subs_pool) do
:syn.update_registry(__MODULE__, tenant, fn pid, meta ->
scope = Realtime.Syn.PostgresCdc.scope(tenant)

:syn.update_registry(scope, tenant, fn pid, meta ->
if node(pid) == node(manager_pid) do
%{meta | manager: manager_pid, subs_pool: subs_pool}
else
Expand All @@ -148,6 +155,4 @@ defmodule Extensions.PostgresCdcRls do
end
end)
end

def syn_topic(tenant_id), do: "cdc_rls:#{tenant_id}"
end
2 changes: 1 addition & 1 deletion lib/extensions/postgres_cdc_rls/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Extensions.PostgresCdcRls.Supervisor do
def init(_args) do
load_migrations_modules()

:syn.add_node_to_scopes([PostgresCdcRls])
:syn.add_node_to_scopes(Realtime.Syn.PostgresCdc.scopes())

children = [
{
Expand Down
4 changes: 3 additions & 1 deletion lib/realtime/operations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ defmodule Realtime.Operations do
"""
def rebalance do
Enum.reduce(:syn.group_names(:users), 0, fn tenant, acc ->
case :syn.lookup(Extensions.PostgresCdcRls, tenant) do
scope = Realtime.Syn.PostgresCdc.scope(tenant)

case :syn.lookup(scope, tenant) do
{pid, %{region: region}} ->
platform_region = Realtime.Nodes.platform_region_translator(region)
current_node = node(pid)
Expand Down
23 changes: 23 additions & 0 deletions lib/realtime/syn/postgres_cdc.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule Realtime.Syn.PostgresCdc do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:nitpick: Should this be scoped (no pun intended) like this Extensions.PostgresCdcRls.Syn instead? No strong opinion tbh as this is highly subjective.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to do something similar for Connect so probably will keep them all together so we can abstract more syn stuff

@moduledoc """
Scope for the PostgresCdc module.
"""

@doc """
Returns the scope for a given tenant id.
"""
@spec scope(String.t()) :: atom()
def scope(tenant_id) do
shards = Application.fetch_env!(:realtime, :postgres_cdc_scope_shards)
shard = :erlang.phash2(tenant_id, shards)
:"realtime_postgres_cdc_#{shard}"
end

def scopes() do
shards = Application.fetch_env!(:realtime, :postgres_cdc_scope_shards)
Enum.map(0..(shards - 1), fn shard -> :"realtime_postgres_cdc_#{shard}" end)
end

def syn_topic_prefix(), do: "realtime_postgres_cdc_"
def syn_topic(tenant_id), do: "#{syn_topic_prefix()}#{tenant_id}"
end
35 changes: 23 additions & 12 deletions lib/realtime/syn_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ defmodule Realtime.SynHandler do
Custom defined Syn's callbacks
"""
require Logger
alias Extensions.PostgresCdcRls
alias RealtimeWeb.Endpoint
alias Realtime.Syn.PostgresCdc
alias Realtime.Tenants.Connect
alias RealtimeWeb.Endpoint

@behaviour :syn_event_handler

Expand All @@ -15,12 +15,17 @@ defmodule Realtime.SynHandler do
Endpoint.local_broadcast(Connect.syn_topic(tenant_id), "ready", %{pid: pid, conn: conn})
end

def on_registry_process_updated(PostgresCdcRls, tenant_id, _pid, meta, _reason) do
# Update that the CdCRls connection is ready
Endpoint.local_broadcast(PostgresCdcRls.syn_topic(tenant_id), "ready", meta)
end
def on_registry_process_updated(scope, tenant_id, _pid, meta, _reason) do
scope = Atom.to_string(scope)

def on_registry_process_updated(_scope, _name, _pid, _meta, _reason), do: :ok
case scope do
"realtime_postgres_cdc_" <> _ ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:nitpick: What if we keep this as part of the Syn module you created and not expose the prefix here?

@postgres_cdc_scope_prefix  Realtime.Syn.PostgresCdc.scope_prefix

case scope do
      @postgres_cdc_scope_prefix <> _ ->
...

Endpoint.local_broadcast(PostgresCdc.syn_topic(tenant_id), "ready", meta)

_ ->
:ok
end
end

@doc """
When processes registered with :syn are unregistered, either manually or by stopping, this
Expand All @@ -31,14 +36,20 @@ defmodule Realtime.SynHandler do
We want to log conflict resolutions to know when more than one process on the cluster
was started, and subsequently stopped because :syn handled the conflict.
"""
@postgres_cdc_scope_prefix PostgresCdc.syn_topic_prefix()
@impl true
def on_process_unregistered(mod, name, pid, _meta, reason) do
if reason == :syn_conflict_resolution do
log("#{mod} terminated due to syn conflict resolution: #{inspect(name)} #{inspect(pid)}")
def on_process_unregistered(scope, name, pid, _meta, reason) do
case Atom.to_string(scope) do
@postgres_cdc_scope_prefix <> _ = scope ->
Endpoint.local_broadcast(PostgresCdc.syn_topic(name), scope <> "_down", %{pid: pid, reason: reason})

_ ->
topic = topic(scope)
Endpoint.local_broadcast(topic <> ":" <> name, topic <> "_down", %{pid: pid, reason: reason})
end

topic = topic(mod)
Endpoint.local_broadcast(topic <> ":" <> name, topic <> "_down", %{pid: pid, reason: reason})
if reason == :syn_conflict_resolution,
do: log("#{scope} terminated due to syn conflict resolution: #{inspect(name)} #{inspect(pid)}")

:ok
end
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.57.3",
version: "2.57.4",
elixir: "~> 1.18",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
24 changes: 15 additions & 9 deletions test/realtime/extensions/cdc_rls/cdc_rls_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ defmodule Realtime.Extensions.CdcRlsTest do
metadata = [metadata: subscription_metadata]
:ok = PostgresCdc.subscribe(PostgresCdcRls, pg_change_params, external_id, metadata)

RealtimeWeb.Endpoint.subscribe(PostgresCdcRls.syn_topic(tenant.external_id))
RealtimeWeb.Endpoint.subscribe(Realtime.Syn.PostgresCdc.syn_topic(tenant.external_id))
# First time it will return nil
PostgresCdcRls.handle_connect(args)
# Wait for it to start
Expand All @@ -88,14 +88,17 @@ defmodule Realtime.Extensions.CdcRlsTest do
# Now subscribe to the Postgres Changes
{:ok, _} = PostgresCdcRls.handle_after_connect(response, postgres_extension, pg_change_params)

RealtimeWeb.Endpoint.unsubscribe(PostgresCdcRls.syn_topic(tenant.external_id))
RealtimeWeb.Endpoint.unsubscribe(Realtime.Syn.PostgresCdc.syn_topic(tenant.external_id))
%{tenant: tenant}
end

test "supervisor crash must not respawn", %{tenant: tenant} do
scope = Realtime.Syn.PostgresCdc.scope(tenant.external_id)

sup =
Enum.reduce_while(1..30, nil, fn _, acc ->
:syn.lookup(Extensions.PostgresCdcRls, tenant.external_id)
scope
|> :syn.lookup(tenant.external_id)
|> case do
:undefined ->
Process.sleep(500)
Expand All @@ -109,16 +112,16 @@ defmodule Realtime.Extensions.CdcRlsTest do
assert Process.alive?(sup)
Process.monitor(sup)

RealtimeWeb.Endpoint.subscribe(PostgresCdcRls.syn_topic(tenant.external_id))
RealtimeWeb.Endpoint.subscribe(Realtime.Syn.PostgresCdc.syn_topic(tenant.external_id))

Process.exit(sup, :kill)
assert_receive {:DOWN, _, :process, ^sup, _reason}, 5000

assert_receive %{event: "postgres_cdc_rls_down"}
scope_down = Atom.to_string(scope) <> "_down"

assert_receive {:DOWN, _, :process, ^sup, _reason}, 5000
assert_receive %{event: ^scope_down}
refute_receive %{event: "ready"}, 1000

:undefined = :syn.lookup(Extensions.PostgresCdcRls, tenant.external_id)
:undefined = :syn.lookup(Realtime.Syn.PostgresCdc.scope(tenant.external_id), tenant.external_id)
end

test "Subscription manager updates oids", %{tenant: tenant} do
Expand Down Expand Up @@ -150,7 +153,10 @@ defmodule Realtime.Extensions.CdcRlsTest do
test "Stop tenant supervisor", %{tenant: tenant} do
sup =
Enum.reduce_while(1..10, nil, fn _, acc ->
case :syn.lookup(Extensions.PostgresCdcRls, tenant.external_id) do
tenant.external_id
|> Realtime.Syn.PostgresCdc.scope()
|> :syn.lookup(tenant.external_id)
|> case do
:undefined ->
Process.sleep(500)
{:cont, acc}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ defmodule Realtime.Extensions.CdcRls.SubscriptionManagerTest do
|> Map.put("subscribers_nodes_table", subscribers_nodes_table)

# register this process with syn as if this was the WorkersSupervisor
:syn.register(PostgresCdcRls, tenant.external_id, self(), %{region: "us-east-1", manager: nil, subs_pool: nil})

scope = Realtime.Syn.PostgresCdc.scope(tenant.external_id)
:syn.register(scope, tenant.external_id, self(), %{region: "us-east-1", manager: nil, subs_pool: nil})

{:ok, pid} = SubscriptionManager.start_link(Map.put(args, "id", tenant.external_id))
# This serves so that we know that handle_continue has finished
Expand Down
2 changes: 1 addition & 1 deletion test/realtime/syn_handler_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ defmodule Realtime.SynHandlerTest do
end
end

defp assert_process_down(pid, reason \\ nil, timeout \\ 100) do
defp assert_process_down(pid, reason, timeout) do
ref = Process.monitor(pid)

if reason do
Expand Down