Skip to content

Commit

Permalink
fix: ensure DB change broadcast payload works for existing and new api
Browse files Browse the repository at this point in the history
  • Loading branch information
w3b6x9 committed Jun 7, 2022
1 parent c526a4d commit 69bcc4c
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 14 deletions.
10 changes: 1 addition & 9 deletions lib/extensions/postgres/postgres_subscribers_notification.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
defmodule Extensions.Postgres.SubscribersNotification do
require Logger

alias Phoenix.Socket.Broadcast
alias Realtime.{MessageDispatcher, PubSub}

def broadcast_change(topic, %{subscription_ids: subscription_ids} = change) do
Expand All @@ -10,18 +9,11 @@ defmodule Extensions.Postgres.SubscribersNotification do
!Enum.member?([:is_rls_enabled, :subscription_ids], key)
end)

broadcast = %Broadcast{
# MessageDispatcher updates topic field
topic: "",
event: "realtime",
payload: %{payload: payload, event: payload.type}
}

Phoenix.PubSub.broadcast_from(
PubSub,
self(),
topic,
{broadcast, subscription_ids},
{payload, subscription_ids},
MessageDispatcher
)
end
Expand Down
23 changes: 21 additions & 2 deletions lib/realtime/message_dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,31 @@ defmodule Realtime.MessageDispatcher do
@doc """
Hook invoked by Phoenix.PubSub dispatch.
"""

alias Phoenix.Socket.Broadcast

def dispatch([_ | _] = topic_subscriptions, _from, {payload, subscription_ids}) do
_ =
Enum.reduce(topic_subscriptions, %{}, fn
{_pid, {:subscriber_fastlane, fastlane_pid, serializer, id, join_topic}}, cache ->
{_pid, {:subscriber_fastlane, fastlane_pid, serializer, id, join_topic, is_new_api}},
cache ->
if MapSet.member?(subscription_ids, id) do
broadcast_message(cache, fastlane_pid, %{payload | topic: join_topic}, serializer)
new_payload =
if is_new_api do
%Broadcast{
topic: join_topic,
event: "realtime",
payload: %{payload: payload, event: payload.type}
}
else
%Broadcast{
topic: join_topic,
event: payload.type,
payload: payload
}
end

broadcast_message(cache, fastlane_pid, new_payload, serializer)
else
cache
end
Expand Down
10 changes: 7 additions & 3 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,22 @@ defmodule RealtimeWeb.RealtimeChannel do
postgres_topic = topic_from_config(params)
Logger.info("Postgres_topic is " <> postgres_topic)

realtime_configs = params["configs"]["realtime"]

postgres_config =
if postgres_topic != "" || !params["configs"]["realtime"] do
if postgres_topic != "" || !realtime_configs do
Endpoint.unsubscribe(topic)

metadata = [
metadata: {:subscriber_fastlane, pid, serializer, UUID.string_to_binary!(id), topic}
metadata:
{:subscriber_fastlane, pid, serializer, UUID.string_to_binary!(id), topic,
!!realtime_configs}
]

Endpoint.subscribe("realtime:postgres:" <> tenant, metadata)

postgres_config =
case params["configs"]["realtime"]["filter"] do
case realtime_configs["filter"] do
nil ->
case String.split(sub_topic, ":") do
[schema] ->
Expand Down

0 comments on commit 69bcc4c

Please sign in to comment.