diff --git a/README.md b/README.md index 7dd223bf3..90f690ffe 100644 --- a/README.md +++ b/README.md @@ -192,7 +192,8 @@ If you're using the default tenant, the URL is `ws://realtime-dev.localhost:4000 | REBALANCE_CHECK_INTERVAL_IN_MS | number | Time in ms to check if process is in the right region | | DISCONNECT_SOCKET_ON_NO_CHANNELS_INTERVAL_IN_MS | number | Time in ms to check if a socket has no channels open and if so, disconnect it | | BROADCAST_POOL_SIZE | number | Number of processes to relay Phoenix.PubSub messages across the cluster | - +| POSTGRES_CDC_SCOPE_SHARDS | number | Number of dynamic supervisor partitions used by the Postgres CDC extension. Defaults to 5. | +| USERS_SCOPE_SHARDS | number | Number of dynamic supervisor partitions used by the Users extension. Defaults to 5. | The OpenTelemetry variables mentioned above are not an exhaustive list of all [supported environment variables](https://opentelemetry.io/docs/languages/sdk-configuration/). diff --git a/config/runtime.exs b/config/runtime.exs index f3319d636..1d4880b35 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -71,6 +71,7 @@ broadcast_pool_size = Env.get_integer("BROADCAST_POOL_SIZE", 10) pubsub_adapter = System.get_env("PUBSUB_ADAPTER", "gen_rpc") |> String.to_atom() websocket_max_heap_size = div(Env.get_integer("WEBSOCKET_MAX_HEAP_SIZE", 50_000_000), :erlang.system_info(:wordsize)) users_scope_shards = Env.get_integer("USERS_SCOPE_SHARDS", 5) +postgres_cdc_scope_shards = Env.get_integer("POSTGRES_CDC_SCOPE_SHARDS", 5) regional_broadcasting = Env.get_boolean("REGIONAL_BROADCASTING", false) no_channel_timeout_in_ms = @@ -130,6 +131,7 @@ config :realtime, pubsub_adapter: pubsub_adapter, broadcast_pool_size: broadcast_pool_size, users_scope_shards: users_scope_shards, + postgres_cdc_scope_shards: postgres_cdc_scope_shards, regional_broadcasting: regional_broadcasting if config_env() != :test && run_janitor? do diff --git a/lib/extensions/postgres_cdc_rls/cdc_rls.ex b/lib/extensions/postgres_cdc_rls/cdc_rls.ex index 8a131cbe1..ed44757c4 100644 --- a/lib/extensions/postgres_cdc_rls/cdc_rls.ex +++ b/lib/extensions/postgres_cdc_rls/cdc_rls.ex @@ -68,7 +68,9 @@ defmodule Extensions.PostgresCdcRls do @spec handle_stop(String.t(), non_neg_integer()) :: :ok def handle_stop(tenant, timeout) when is_binary(tenant) do - case :syn.whereis_name({__MODULE__, tenant}) do + scope = Realtime.Syn.PostgresCdc.scope(tenant) + + case :syn.whereis_name({scope, tenant}) do :undefined -> Logger.warning("Database supervisor not found for tenant #{tenant}") :ok @@ -124,7 +126,9 @@ defmodule Extensions.PostgresCdcRls do @spec get_manager_conn(String.t()) :: {:error, nil | :wait} | {:ok, pid(), pid()} def get_manager_conn(id) do - case :syn.lookup(__MODULE__, id) do + scope = Realtime.Syn.PostgresCdc.scope(id) + + case :syn.lookup(scope, id) do {_, %{manager: nil, subs_pool: nil}} -> {:error, :wait} {_, %{manager: manager, subs_pool: conn}} -> {:ok, manager, conn} _ -> {:error, nil} @@ -133,12 +137,15 @@ defmodule Extensions.PostgresCdcRls do @spec supervisor_id(String.t(), String.t()) :: {atom(), String.t(), map()} def supervisor_id(tenant, region) do - {__MODULE__, tenant, %{region: region, manager: nil, subs_pool: nil}} + scope = Realtime.Syn.PostgresCdc.scope(tenant) + {scope, tenant, %{region: region, manager: nil, subs_pool: nil}} end @spec update_meta(String.t(), pid(), pid()) :: {:ok, {pid(), term()}} | {:error, term()} def update_meta(tenant, manager_pid, subs_pool) do - :syn.update_registry(__MODULE__, tenant, fn pid, meta -> + scope = Realtime.Syn.PostgresCdc.scope(tenant) + + :syn.update_registry(scope, tenant, fn pid, meta -> if node(pid) == node(manager_pid) do %{meta | manager: manager_pid, subs_pool: subs_pool} else @@ -148,6 +155,4 @@ defmodule Extensions.PostgresCdcRls do end end) end - - def syn_topic(tenant_id), do: "cdc_rls:#{tenant_id}" end diff --git a/lib/extensions/postgres_cdc_rls/supervisor.ex b/lib/extensions/postgres_cdc_rls/supervisor.ex index 21e124190..fc3701aeb 100644 --- a/lib/extensions/postgres_cdc_rls/supervisor.ex +++ b/lib/extensions/postgres_cdc_rls/supervisor.ex @@ -15,7 +15,7 @@ defmodule Extensions.PostgresCdcRls.Supervisor do def init(_args) do load_migrations_modules() - :syn.add_node_to_scopes([PostgresCdcRls]) + :syn.add_node_to_scopes(Realtime.Syn.PostgresCdc.scopes()) children = [ { diff --git a/lib/realtime/operations.ex b/lib/realtime/operations.ex index 76efa38fb..e17bf249e 100644 --- a/lib/realtime/operations.ex +++ b/lib/realtime/operations.ex @@ -9,7 +9,9 @@ defmodule Realtime.Operations do """ def rebalance do Enum.reduce(:syn.group_names(:users), 0, fn tenant, acc -> - case :syn.lookup(Extensions.PostgresCdcRls, tenant) do + scope = Realtime.Syn.PostgresCdc.scope(tenant) + + case :syn.lookup(scope, tenant) do {pid, %{region: region}} -> platform_region = Realtime.Nodes.platform_region_translator(region) current_node = node(pid) diff --git a/lib/realtime/syn/postgres_cdc.ex b/lib/realtime/syn/postgres_cdc.ex new file mode 100644 index 000000000..3b4dd6541 --- /dev/null +++ b/lib/realtime/syn/postgres_cdc.ex @@ -0,0 +1,23 @@ +defmodule Realtime.Syn.PostgresCdc do + @moduledoc """ + Scope for the PostgresCdc module. + """ + + @doc """ + Returns the scope for a given tenant id. + """ + @spec scope(String.t()) :: atom() + def scope(tenant_id) do + shards = Application.fetch_env!(:realtime, :postgres_cdc_scope_shards) + shard = :erlang.phash2(tenant_id, shards) + :"realtime_postgres_cdc_#{shard}" + end + + def scopes() do + shards = Application.fetch_env!(:realtime, :postgres_cdc_scope_shards) + Enum.map(0..(shards - 1), fn shard -> :"realtime_postgres_cdc_#{shard}" end) + end + + def syn_topic_prefix(), do: "realtime_postgres_cdc_" + def syn_topic(tenant_id), do: "#{syn_topic_prefix()}#{tenant_id}" +end diff --git a/lib/realtime/syn_handler.ex b/lib/realtime/syn_handler.ex index 24c8d78b4..659e257b2 100644 --- a/lib/realtime/syn_handler.ex +++ b/lib/realtime/syn_handler.ex @@ -3,9 +3,9 @@ defmodule Realtime.SynHandler do Custom defined Syn's callbacks """ require Logger - alias Extensions.PostgresCdcRls - alias RealtimeWeb.Endpoint + alias Realtime.Syn.PostgresCdc alias Realtime.Tenants.Connect + alias RealtimeWeb.Endpoint @behaviour :syn_event_handler @@ -15,12 +15,17 @@ defmodule Realtime.SynHandler do Endpoint.local_broadcast(Connect.syn_topic(tenant_id), "ready", %{pid: pid, conn: conn}) end - def on_registry_process_updated(PostgresCdcRls, tenant_id, _pid, meta, _reason) do - # Update that the CdCRls connection is ready - Endpoint.local_broadcast(PostgresCdcRls.syn_topic(tenant_id), "ready", meta) - end + def on_registry_process_updated(scope, tenant_id, _pid, meta, _reason) do + scope = Atom.to_string(scope) - def on_registry_process_updated(_scope, _name, _pid, _meta, _reason), do: :ok + case scope do + "realtime_postgres_cdc_" <> _ -> + Endpoint.local_broadcast(PostgresCdc.syn_topic(tenant_id), "ready", meta) + + _ -> + :ok + end + end @doc """ When processes registered with :syn are unregistered, either manually or by stopping, this @@ -31,14 +36,20 @@ defmodule Realtime.SynHandler do We want to log conflict resolutions to know when more than one process on the cluster was started, and subsequently stopped because :syn handled the conflict. """ + @postgres_cdc_scope_prefix PostgresCdc.syn_topic_prefix() @impl true - def on_process_unregistered(mod, name, pid, _meta, reason) do - if reason == :syn_conflict_resolution do - log("#{mod} terminated due to syn conflict resolution: #{inspect(name)} #{inspect(pid)}") + def on_process_unregistered(scope, name, pid, _meta, reason) do + case Atom.to_string(scope) do + @postgres_cdc_scope_prefix <> _ = scope -> + Endpoint.local_broadcast(PostgresCdc.syn_topic(name), scope <> "_down", %{pid: pid, reason: reason}) + + _ -> + topic = topic(scope) + Endpoint.local_broadcast(topic <> ":" <> name, topic <> "_down", %{pid: pid, reason: reason}) end - topic = topic(mod) - Endpoint.local_broadcast(topic <> ":" <> name, topic <> "_down", %{pid: pid, reason: reason}) + if reason == :syn_conflict_resolution, + do: log("#{scope} terminated due to syn conflict resolution: #{inspect(name)} #{inspect(pid)}") :ok end diff --git a/mix.exs b/mix.exs index f5195910e..08ec62d38 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.57.3", + version: "2.57.4", elixir: "~> 1.18", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/realtime/extensions/cdc_rls/cdc_rls_test.exs b/test/realtime/extensions/cdc_rls/cdc_rls_test.exs index 4f29a39c4..7892e3740 100644 --- a/test/realtime/extensions/cdc_rls/cdc_rls_test.exs +++ b/test/realtime/extensions/cdc_rls/cdc_rls_test.exs @@ -76,7 +76,7 @@ defmodule Realtime.Extensions.CdcRlsTest do metadata = [metadata: subscription_metadata] :ok = PostgresCdc.subscribe(PostgresCdcRls, pg_change_params, external_id, metadata) - RealtimeWeb.Endpoint.subscribe(PostgresCdcRls.syn_topic(tenant.external_id)) + RealtimeWeb.Endpoint.subscribe(Realtime.Syn.PostgresCdc.syn_topic(tenant.external_id)) # First time it will return nil PostgresCdcRls.handle_connect(args) # Wait for it to start @@ -88,14 +88,17 @@ defmodule Realtime.Extensions.CdcRlsTest do # Now subscribe to the Postgres Changes {:ok, _} = PostgresCdcRls.handle_after_connect(response, postgres_extension, pg_change_params) - RealtimeWeb.Endpoint.unsubscribe(PostgresCdcRls.syn_topic(tenant.external_id)) + RealtimeWeb.Endpoint.unsubscribe(Realtime.Syn.PostgresCdc.syn_topic(tenant.external_id)) %{tenant: tenant} end test "supervisor crash must not respawn", %{tenant: tenant} do + scope = Realtime.Syn.PostgresCdc.scope(tenant.external_id) + sup = Enum.reduce_while(1..30, nil, fn _, acc -> - :syn.lookup(Extensions.PostgresCdcRls, tenant.external_id) + scope + |> :syn.lookup(tenant.external_id) |> case do :undefined -> Process.sleep(500) @@ -109,16 +112,16 @@ defmodule Realtime.Extensions.CdcRlsTest do assert Process.alive?(sup) Process.monitor(sup) - RealtimeWeb.Endpoint.subscribe(PostgresCdcRls.syn_topic(tenant.external_id)) + RealtimeWeb.Endpoint.subscribe(Realtime.Syn.PostgresCdc.syn_topic(tenant.external_id)) Process.exit(sup, :kill) - assert_receive {:DOWN, _, :process, ^sup, _reason}, 5000 - - assert_receive %{event: "postgres_cdc_rls_down"} + scope_down = Atom.to_string(scope) <> "_down" + assert_receive {:DOWN, _, :process, ^sup, _reason}, 5000 + assert_receive %{event: ^scope_down} refute_receive %{event: "ready"}, 1000 - :undefined = :syn.lookup(Extensions.PostgresCdcRls, tenant.external_id) + :undefined = :syn.lookup(Realtime.Syn.PostgresCdc.scope(tenant.external_id), tenant.external_id) end test "Subscription manager updates oids", %{tenant: tenant} do @@ -150,7 +153,10 @@ defmodule Realtime.Extensions.CdcRlsTest do test "Stop tenant supervisor", %{tenant: tenant} do sup = Enum.reduce_while(1..10, nil, fn _, acc -> - case :syn.lookup(Extensions.PostgresCdcRls, tenant.external_id) do + tenant.external_id + |> Realtime.Syn.PostgresCdc.scope() + |> :syn.lookup(tenant.external_id) + |> case do :undefined -> Process.sleep(500) {:cont, acc} diff --git a/test/realtime/extensions/cdc_rls/subscription_manager_test.exs b/test/realtime/extensions/cdc_rls/subscription_manager_test.exs index 7b92afcd8..3fbde34b5 100644 --- a/test/realtime/extensions/cdc_rls/subscription_manager_test.exs +++ b/test/realtime/extensions/cdc_rls/subscription_manager_test.exs @@ -18,7 +18,9 @@ defmodule Realtime.Extensions.CdcRls.SubscriptionManagerTest do |> Map.put("subscribers_nodes_table", subscribers_nodes_table) # register this process with syn as if this was the WorkersSupervisor - :syn.register(PostgresCdcRls, tenant.external_id, self(), %{region: "us-east-1", manager: nil, subs_pool: nil}) + + scope = Realtime.Syn.PostgresCdc.scope(tenant.external_id) + :syn.register(scope, tenant.external_id, self(), %{region: "us-east-1", manager: nil, subs_pool: nil}) {:ok, pid} = SubscriptionManager.start_link(Map.put(args, "id", tenant.external_id)) # This serves so that we know that handle_continue has finished diff --git a/test/realtime/syn_handler_test.exs b/test/realtime/syn_handler_test.exs index 7caf80625..96a2e316a 100644 --- a/test/realtime/syn_handler_test.exs +++ b/test/realtime/syn_handler_test.exs @@ -271,7 +271,7 @@ defmodule Realtime.SynHandlerTest do end end - defp assert_process_down(pid, reason \\ nil, timeout \\ 100) do + defp assert_process_down(pid, reason, timeout) do ref = Process.monitor(pid) if reason do