Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/extensions/postgres_cdc_rls/cdc_rls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ defmodule Extensions.PostgresCdcRls do
Start db poller. Expects an `external_id` as a `tenant`.
"""

@spec start(map()) :: :ok | {:error, :already_started | :reserved}
@spec start(map()) :: {:ok, pid} | {:error, :already_started | :reserved}
def start(%{"id" => tenant} = args) when is_binary(tenant) do
args = Map.merge(args, %{"subs_pool_size" => Map.get(args, "subcriber_pool_size", 4)})

Expand Down
25 changes: 16 additions & 9 deletions lib/extensions/postgres_cdc_rls/replication_poller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
alias Realtime.Adapters.Changes.NewRecord
alias Realtime.Adapters.Changes.UpdatedRecord
alias Realtime.Database
alias Realtime.PubSub

def start_link(opts), do: GenServer.start_link(__MODULE__, opts)

@impl true
def init(args) do
tenant = args["id"]
Logger.metadata(external_id: tenant, project: tenant)
tenant_id = args["id"]
Logger.metadata(external_id: tenant_id, project: tenant_id)

tenant = Realtime.Tenants.Cache.get_tenant_by_external_id(tenant_id)

state = %{
backoff: Backoff.new(backoff_min: 100, backoff_max: 5_000, backoff_type: :rand_exp),
Expand All @@ -45,7 +46,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
tenant: tenant
}

{:ok, _} = Registry.register(__MODULE__.Registry, tenant, %{})
{:ok, _} = Registry.register(__MODULE__.Registry, tenant_id, %{})
{:ok, state, {:continue, {:connect, args}}}
end

Expand Down Expand Up @@ -83,7 +84,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do

args = [conn, slot_name, publication, max_changes, max_record_bytes]
{time, list_changes} = :timer.tc(Replications, :list_changes, args)
record_list_changes_telemetry(time, tenant)
record_list_changes_telemetry(time, tenant.external_id)

case handle_list_changes_result(list_changes, tenant) do
{:ok, row_count} ->
Expand Down Expand Up @@ -163,11 +164,11 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
end
end

defp record_list_changes_telemetry(time, tenant) do
defp record_list_changes_telemetry(time, tenant_id) do
Realtime.Telemetry.execute(
[:realtime, :replication, :poller, :query, :stop],
%{duration: time},
%{tenant: tenant}
%{tenant: tenant_id}
)
end

Expand All @@ -182,8 +183,14 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
) do
for row <- rows,
change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do
topic = "realtime:postgres:" <> tenant
Phoenix.PubSub.broadcast_from(PubSub, self(), topic, change, MessageDispatcher)
topic = "realtime:postgres:" <> tenant.external_id

RealtimeWeb.TenantBroadcaster.pubsub_broadcast(
tenant,
topic,
change,
MessageDispatcher
)
end

{:ok, rows_count}
Expand Down
9 changes: 9 additions & 0 deletions lib/realtime_web/tenant_broadcaster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ defmodule RealtimeWeb.TenantBroadcaster do
"""

alias Phoenix.Endpoint
alias Phoenix.PubSub
alias Realtime.Api.Tenant

@callback broadcast(Endpoint.topic(), Endpoint.event(), Endpoint.msg()) :: :ok | {:error, term()}
@callback broadcast_from(from :: pid(), Endpoint.topic(), Endpoint.event(), Endpoint.msg()) :: :ok | {:error, term()}
@callback pubsub_broadcast(PubSub.topic(), PubSub.message(), PubSub.dispatcher()) ::
:ok | {:error, term()}

@spec broadcast(tenant :: Tenant.t(), Endpoint.topic(), Endpoint.event(), Endpoint.msg()) :: :ok
def broadcast(tenant, topic, event, msg) do
Expand All @@ -19,6 +22,12 @@ defmodule RealtimeWeb.TenantBroadcaster do
adapter(tenant.broadcast_adapter).broadcast_from(from, topic, event, msg)
end

@spec pubsub_broadcast(tenant :: Tenant.t(), PubSub.topic(), PubSub.message(), PubSub.dispatcher()) ::
:ok | {:error, term}
def pubsub_broadcast(tenant, topic, message, dispatcher) do
adapter(tenant.broadcast_adapter).pubsub_broadcast(topic, message, dispatcher)
end

defp adapter(:gen_rpc), do: RealtimeWeb.TenantBroadcaster.GenRpc
defp adapter(_), do: RealtimeWeb.TenantBroadcaster.Phoenix
end
9 changes: 9 additions & 0 deletions lib/realtime_web/tenant_broadcaster/gen_rpc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,13 @@ defmodule RealtimeWeb.TenantBroadcaster.GenRpc do
Realtime.GenRpc.multicast(RealtimeWeb.Endpoint, :local_broadcast_from, [from, topic, event, msg], key: topic)
:ok
end

@impl true
def pubsub_broadcast(topic, message, dispatcher) do
Realtime.GenRpc.multicast(Phoenix.PubSub, :local_broadcast, [Realtime.PubSub, topic, message, dispatcher],
key: topic
)

:ok
end
end
5 changes: 5 additions & 0 deletions lib/realtime_web/tenant_broadcaster/phoenix.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,9 @@ defmodule RealtimeWeb.TenantBroadcaster.Phoenix do

@impl true
def broadcast_from(from, topic, event, msg), do: Endpoint.broadcast_from(from, topic, event, msg)

@impl true
def pubsub_broadcast(topic, message, dispatcher) do
Phoenix.PubSub.broadcast(Realtime.PubSub, topic, message, dispatcher)
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.40.2",
version: "2.40.3",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
Loading
Loading