Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 8 additions & 15 deletions lib/extensions/postgres_cdc_rls/replication_poller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
tenant_id = args["id"]
Logger.metadata(external_id: tenant_id, project: tenant_id)

tenant = Realtime.Tenants.Cache.get_tenant_by_external_id(tenant_id)

state = %{
backoff: Backoff.new(backoff_min: 100, backoff_max: 5_000, backoff_type: :rand_exp),
db_host: args["db_host"],
Expand All @@ -43,7 +41,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
retry_ref: nil,
retry_count: 0,
slot_name: args["slot_name"] <> slot_name_suffix(),
tenant: tenant
tenant_id: tenant_id
}

{:ok, _} = Registry.register(__MODULE__.Registry, tenant_id, %{})
Expand Down Expand Up @@ -76,17 +74,17 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
max_record_bytes: max_record_bytes,
max_changes: max_changes,
conn: conn,
tenant: tenant
tenant_id: tenant_id
} = state
) do
cancel_timer(poll_ref)
cancel_timer(retry_ref)

args = [conn, slot_name, publication, max_changes, max_record_bytes]
{time, list_changes} = :timer.tc(Replications, :list_changes, args)
record_list_changes_telemetry(time, tenant.external_id)
record_list_changes_telemetry(time, tenant_id)

case handle_list_changes_result(list_changes, tenant) do
case handle_list_changes_result(list_changes, tenant_id) do
{:ok, row_count} ->
Backoff.reset(backoff)

Expand Down Expand Up @@ -179,18 +177,13 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
rows: [_ | _] = rows,
num_rows: rows_count
}},
tenant
tenant_id
) do
for row <- rows,
change <- columns |> Enum.zip(row) |> generate_record() |> List.wrap() do
topic = "realtime:postgres:" <> tenant.external_id

RealtimeWeb.TenantBroadcaster.pubsub_broadcast(
tenant,
topic,
change,
MessageDispatcher
)
topic = "realtime:postgres:" <> tenant_id

RealtimeWeb.TenantBroadcaster.pubsub_broadcast(topic, change, MessageDispatcher)
end

{:ok, rows_count}
Expand Down
2 changes: 1 addition & 1 deletion lib/realtime/tenants/batch_broadcast.ex
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ defmodule Realtime.Tenants.BatchBroadcast do
broadcast = %Phoenix.Socket.Broadcast{topic: topic, event: @event_type, payload: payload}

GenCounter.add(events_per_second_rate.id)
TenantBroadcaster.pubsub_broadcast(tenant, tenant_topic, broadcast, RealtimeChannel.MessageDispatcher)
TenantBroadcaster.pubsub_broadcast(tenant_topic, broadcast, RealtimeChannel.MessageDispatcher)
end

defp permissions_for_message(_, nil, _), do: nil
Expand Down
48 changes: 16 additions & 32 deletions lib/realtime_web/channels/realtime_channel/broadcast_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
alias RealtimeWeb.RealtimeChannel
alias RealtimeWeb.TenantBroadcaster
alias Phoenix.Socket
alias Realtime.Api.Tenant
alias Realtime.GenCounter
alias Realtime.Tenants.Authorization
alias Realtime.Tenants.Authorization.Policies
alias Realtime.Tenants.Authorization.Policies.BroadcastPolicies
alias Realtime.Tenants.Cache

@event_type "broadcast"
@spec handle(map(), Socket.t()) :: {:reply, :ok, Socket.t()} | {:noreply, Socket.t()}
Expand All @@ -38,8 +36,8 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
|> assign(:policies, policies)
|> increment_rate_counter()

%{ack_broadcast: ack_broadcast, tenant: tenant_id} = socket.assigns
send_message(tenant_id, self_broadcast, tenant_topic, payload)
%{ack_broadcast: ack_broadcast} = socket.assigns
send_message(self_broadcast, tenant_topic, payload)
if ack_broadcast, do: {:reply, :ok, socket}, else: {:noreply, socket}

{:ok, policies} ->
Expand All @@ -56,42 +54,28 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandler do
end

def handle(payload, _db_conn, %{assigns: %{private?: false}} = socket) do
%{
assigns: %{
tenant_topic: tenant_topic,
self_broadcast: self_broadcast,
ack_broadcast: ack_broadcast,
tenant: tenant_id
}
} = socket
%{assigns: %{tenant_topic: tenant_topic, self_broadcast: self_broadcast, ack_broadcast: ack_broadcast}} = socket

socket = increment_rate_counter(socket)
send_message(tenant_id, self_broadcast, tenant_topic, payload)
send_message(self_broadcast, tenant_topic, payload)

if ack_broadcast,
do: {:reply, :ok, socket},
else: {:noreply, socket}
end

defp send_message(tenant_id, self_broadcast, tenant_topic, payload) do
with %Tenant{} = tenant <- Cache.get_tenant_by_external_id(tenant_id) do
broadcast = %Phoenix.Socket.Broadcast{
topic: tenant_topic,
event: @event_type,
payload: payload
}

if self_broadcast do
TenantBroadcaster.pubsub_broadcast(tenant, tenant_topic, broadcast, RealtimeChannel.MessageDispatcher)
else
TenantBroadcaster.pubsub_broadcast_from(
tenant,
self(),
tenant_topic,
broadcast,
RealtimeChannel.MessageDispatcher
)
end
defp send_message(self_broadcast, tenant_topic, payload) do
broadcast = %Phoenix.Socket.Broadcast{topic: tenant_topic, event: @event_type, payload: payload}

if self_broadcast do
TenantBroadcaster.pubsub_broadcast(tenant_topic, broadcast, RealtimeChannel.MessageDispatcher)
else
TenantBroadcaster.pubsub_broadcast_from(
self(),
tenant_topic,
broadcast,
RealtimeChannel.MessageDispatcher
)
end
end

Expand Down
50 changes: 24 additions & 26 deletions lib/realtime_web/tenant_broadcaster.ex
Original file line number Diff line number Diff line change
@@ -1,41 +1,39 @@
defmodule RealtimeWeb.TenantBroadcaster do
@moduledoc """
Interface to broadcast messages to tenants
gen_rpc broadcaster
"""

alias Phoenix.Endpoint
alias Phoenix.PubSub
alias Realtime.Api.Tenant

@callback broadcast(Endpoint.topic(), Endpoint.event(), Endpoint.msg()) :: :ok | {:error, term()}
@callback broadcast_from(from :: pid(), Endpoint.topic(), Endpoint.event(), Endpoint.msg()) :: :ok | {:error, term()}
@callback pubsub_broadcast(PubSub.topic(), PubSub.message(), PubSub.dispatcher()) ::
:ok | {:error, term()}
@callback pubsub_broadcast_from(from :: pid, PubSub.topic(), PubSub.message(), PubSub.dispatcher()) ::
:ok | {:error, term()}

@spec broadcast(tenant :: Tenant.t(), Endpoint.topic(), Endpoint.event(), Endpoint.msg()) :: :ok
def broadcast(tenant, topic, event, msg) do
adapter(tenant.broadcast_adapter).broadcast(topic, event, msg)
@spec broadcast(Endpoint.topic(), Endpoint.event(), Endpoint.msg()) :: :ok
def broadcast(topic, event, msg) do
Realtime.GenRpc.multicast(RealtimeWeb.Endpoint, :local_broadcast, [topic, event, msg], key: topic)
:ok
end

@spec broadcast_from(tenant :: Tenant.t(), from :: pid, Endpoint.topic(), Endpoint.event(), Endpoint.msg()) :: :ok
def broadcast_from(tenant, from, topic, event, msg) do
adapter(tenant.broadcast_adapter).broadcast_from(from, topic, event, msg)
@spec broadcast_from(from :: pid, Endpoint.topic(), Endpoint.event(), Endpoint.msg()) :: :ok
def broadcast_from(from, topic, event, msg) do
Realtime.GenRpc.multicast(RealtimeWeb.Endpoint, :local_broadcast_from, [from, topic, event, msg], key: topic)
:ok
end

@spec pubsub_broadcast(tenant :: Tenant.t(), PubSub.topic(), PubSub.message(), PubSub.dispatcher()) ::
:ok | {:error, term}
def pubsub_broadcast(tenant, topic, message, dispatcher) do
adapter(tenant.broadcast_adapter).pubsub_broadcast(topic, message, dispatcher)
end
@spec pubsub_broadcast(PubSub.topic(), PubSub.message(), PubSub.dispatcher()) :: :ok
def pubsub_broadcast(topic, message, dispatcher) do
Realtime.GenRpc.multicast(PubSub, :local_broadcast, [Realtime.PubSub, topic, message, dispatcher], key: topic)

@spec pubsub_broadcast_from(tenant :: Tenant.t(), from :: pid, PubSub.topic(), PubSub.message(), PubSub.dispatcher()) ::
:ok | {:error, term}
def pubsub_broadcast_from(tenant, from, topic, message, dispatcher) do
adapter(tenant.broadcast_adapter).pubsub_broadcast_from(from, topic, message, dispatcher)
:ok
end

defp adapter(:gen_rpc), do: RealtimeWeb.TenantBroadcaster.GenRpc
defp adapter(_), do: RealtimeWeb.TenantBroadcaster.Phoenix
@spec pubsub_broadcast_from(from :: pid, PubSub.topic(), PubSub.message(), PubSub.dispatcher()) :: :ok
def pubsub_broadcast_from(from, topic, message, dispatcher) do
Realtime.GenRpc.multicast(
PubSub,
:local_broadcast_from,
[Realtime.PubSub, from, topic, message, dispatcher],
key: topic
)

:ok
end
end
38 changes: 0 additions & 38 deletions lib/realtime_web/tenant_broadcaster/gen_rpc.ex

This file was deleted.

23 changes: 0 additions & 23 deletions lib/realtime_web/tenant_broadcaster/phoenix.ex

This file was deleted.

2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.41.23",
version: "2.41.24",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
4 changes: 1 addition & 3 deletions test/integration/rt_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -530,12 +530,10 @@ defmodule Realtime.Integration.RtChannelTest do

@tag policies: [:authenticated_read_broadcast_and_presence, :authenticated_write_broadcast_and_presence],
mode: :distributed
test "private broadcast with valid channel with permissions sends message using a remote node (gen_rpc adapter)", %{
test "private broadcast with valid channel with permissions sends message using a remote node", %{
tenant: tenant,
topic: topic
} do
{:ok, tenant} = Realtime.Api.update_tenant(tenant, %{broadcast_adapter: :gen_rpc})

{:ok, token} =
generate_token(tenant, %{exp: System.system_time(:second) + 1000, role: "authenticated", sub: random_string()})

Expand Down
Loading
Loading