Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ You can add your own by making a `POST` request to the server. You must change b
"region": "us-west-1",
"poll_interval_ms": 100,
"poll_max_record_bytes": 1048576,
"ssl_enforced": false
"ssl_enforced": false
}
}
]
Expand Down Expand Up @@ -284,6 +284,7 @@ This is the list of operational codes that can help you understand your deployme
| UnknownErrorOnController | An error we are not handling correctly was triggered on a controller |
| UnknownErrorOnChannel | An error we are not handling correctly was triggered on a channel |
| PresenceRateLimitReached | Limit of presence events reached |
| UnableToReplayMessages | An error while replaying messages |

## License

Expand Down
4 changes: 3 additions & 1 deletion lib/realtime/api/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ defmodule Realtime.Api.Message do
@primary_key {:id, Ecto.UUID, autogenerate: true}
@schema_prefix "realtime"

@type t :: %__MODULE__{}

schema "messages" do
field(:topic, :string)
field(:extension, Ecto.Enum, values: [:broadcast, :presence])
Expand Down Expand Up @@ -39,7 +41,7 @@ defmodule Realtime.Api.Message do
end

defp maybe_put_timestamp(changeset, field) do
case Map.get(changeset.data, field) do
case get_field(changeset, field) do
nil -> put_timestamp(changeset, field)
_ -> changeset
end
Expand Down
55 changes: 55 additions & 0 deletions lib/realtime/messages.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,61 @@ defmodule Realtime.Messages do
Handles `realtime.messages` table operations
"""

alias Realtime.Api.Message

import Ecto.Query, only: [from: 2]

@hard_limit 25
@default_timeout 5_000

@doc """
Fetch last `limit ` messages for a given `topic` inserted after `since`

Automatically uses RPC if the database connection is not in the same node

Only allowed for private channels
"""
@spec replay(pid, String.t(), non_neg_integer, non_neg_integer) ::
{:ok, Message.t(), [String.t()]} | {:error, term} | {:error, :rpc_error, term}
def replay(conn, topic, since, limit) when node(conn) == node() and is_integer(since) and is_integer(limit) do
limit = max(min(limit, @hard_limit), 1)

with {:ok, since} <- DateTime.from_unix(since, :millisecond),
{:ok, messages} <- messages(conn, topic, since, limit) do
{:ok, Enum.reverse(messages), MapSet.new(messages, & &1.id)}
else
{:error, :postgrex_exception} -> {:error, :failed_to_replay_messages}
{:error, :invalid_unix_time} -> {:error, :invalid_replay_params}
error -> error
end
end

def replay(conn, topic, since, limit) when is_integer(since) and is_integer(limit) do
Realtime.GenRpc.call(node(conn), __MODULE__, :replay, [conn, topic, since, limit], key: topic)
end

def replay(_, _, _, _), do: {:error, :invalid_replay_params}

defp messages(conn, topic, since, limit) do
since = DateTime.to_naive(since)
# We want to avoid searching partitions in the future as they should be empty
# so we limit to 1 minute in the future to account for any potential drift
now = NaiveDateTime.utc_now() |> NaiveDateTime.add(1, :minute)

query =
from m in Message,
where:
m.topic == ^topic and
m.private == true and
m.extension == :broadcast and
m.inserted_at >= ^since and
m.inserted_at < ^now,
limit: ^limit,
order_by: [desc: m.inserted_at]

Realtime.Repo.all(conn, query, Message, timeout: @default_timeout)
end

@doc """
Deletes messages older than 72 hours for a given tenant connection
"""
Expand Down
38 changes: 24 additions & 14 deletions lib/realtime/tenants/batch_broadcast.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ defmodule Realtime.Tenants.BatchBroadcast do
@spec broadcast(
auth_params :: map() | nil,
tenant :: Tenant.t(),
messages :: %{messages: list(%{topic: String.t(), payload: map(), event: String.t(), private: boolean()})},
messages :: %{
messages: list(%{id: String.t(), topic: String.t(), payload: map(), event: String.t(), private: boolean()})
},
super_user :: boolean()
) :: :ok | {:error, atom()}
def broadcast(auth_params, tenant, messages, super_user \\ false)
Expand Down Expand Up @@ -59,8 +61,8 @@ defmodule Realtime.Tenants.BatchBroadcast do
# Handle events for public channel
events
|> Map.get(false, [])
|> Enum.each(fn %{topic: sub_topic, payload: payload, event: event} ->
send_message_and_count(tenant, events_per_second_rate, sub_topic, event, payload, true)
|> Enum.each(fn message ->
send_message_and_count(tenant, events_per_second_rate, message, true)
end)

# Handle events for private channel
Expand All @@ -69,14 +71,14 @@ defmodule Realtime.Tenants.BatchBroadcast do
|> Enum.group_by(fn event -> Map.get(event, :topic) end)
|> Enum.each(fn {topic, events} ->
if super_user do
Enum.each(events, fn %{topic: sub_topic, payload: payload, event: event} ->
send_message_and_count(tenant, events_per_second_rate, sub_topic, event, payload, false)
Enum.each(events, fn message ->
send_message_and_count(tenant, events_per_second_rate, message, false)
end)
else
case permissions_for_message(tenant, auth_params, topic) do
%Policies{broadcast: %BroadcastPolicies{write: true}} ->
Enum.each(events, fn %{topic: sub_topic, payload: payload, event: event} ->
send_message_and_count(tenant, events_per_second_rate, sub_topic, event, payload, false)
Enum.each(events, fn message ->
send_message_and_count(tenant, events_per_second_rate, message, false)
end)

_ ->
Expand All @@ -91,15 +93,15 @@ defmodule Realtime.Tenants.BatchBroadcast do

def broadcast(_, nil, _, _), do: {:error, :tenant_not_found}

def changeset(payload, attrs) do
defp changeset(payload, attrs) do
payload
|> cast(attrs, [])
|> cast_embed(:messages, required: true, with: &message_changeset/2)
end

def message_changeset(message, attrs) do
defp message_changeset(message, attrs) do
message
|> cast(attrs, [:topic, :payload, :event, :private])
|> cast(attrs, [:id, :topic, :payload, :event, :private])
|> maybe_put_private_change()
|> validate_required([:topic, :payload, :event])
end
Expand All @@ -112,11 +114,19 @@ defmodule Realtime.Tenants.BatchBroadcast do
end

@event_type "broadcast"
defp send_message_and_count(tenant, events_per_second_rate, topic, event, payload, public?) do
tenant_topic = Tenants.tenant_topic(tenant, topic, public?)
payload = %{"payload" => payload, "event" => event, "type" => "broadcast"}
defp send_message_and_count(tenant, events_per_second_rate, message, public?) do
tenant_topic = Tenants.tenant_topic(tenant, message.topic, public?)

broadcast = %Phoenix.Socket.Broadcast{topic: topic, event: @event_type, payload: payload}
payload = %{"payload" => message.payload, "event" => message.event, "type" => "broadcast"}

payload =
if message[:id] do
Map.put(payload, "meta", %{"id" => message.id})
else
payload
end

broadcast = %Phoenix.Socket.Broadcast{topic: message.topic, event: @event_type, payload: payload}

GenCounter.add(events_per_second_rate.id)
TenantBroadcaster.pubsub_broadcast(tenant.external_id, tenant_topic, broadcast, RealtimeChannel.MessageDispatcher)
Expand Down
6 changes: 4 additions & 2 deletions lib/realtime/tenants/migrations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ defmodule Realtime.Tenants.Migrations do
RealtimeSendSetsTopicConfig,
SubscriptionIndexBridgingDisabled,
RunSubscriptionIndexBridgingDisabled,
BroadcastSendErrorLogging
BroadcastSendErrorLogging,
CreateMessagesReplayIndex
}

@migrations [
Expand Down Expand Up @@ -140,7 +141,8 @@ defmodule Realtime.Tenants.Migrations do
{20_250_128_220_012, RealtimeSendSetsTopicConfig},
{20_250_506_224_012, SubscriptionIndexBridgingDisabled},
{20_250_523_164_012, RunSubscriptionIndexBridgingDisabled},
{20_250_714_121_412, BroadcastSendErrorLogging}
{20_250_714_121_412, BroadcastSendErrorLogging},
{20_250_905_041_441, CreateMessagesReplayIndex}
]

defstruct [:tenant_external_id, :settings]
Expand Down
8 changes: 7 additions & 1 deletion lib/realtime/tenants/replication_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,13 @@ defmodule Realtime.Tenants.ReplicationConnection do
{:ok, topic} <- get_or_error(to_broadcast, "topic", :topic_missing),
{:ok, private} <- get_or_error(to_broadcast, "private", :private_missing),
%Tenant{} = tenant <- Cache.get_tenant_by_external_id(tenant_id),
broadcast_message = %{topic: topic, event: event, private: private, payload: Map.put_new(payload, "id", id)},
broadcast_message = %{
id: id,
topic: topic,
event: event,
private: private,
payload: Map.put_new(payload, "id", id)
},
:ok <- BatchBroadcast.broadcast(nil, tenant, %{messages: [broadcast_message]}, true) do
inserted_at = NaiveDateTime.from_iso8601!(inserted_at)
latency_inserted_at = NaiveDateTime.utc_now() |> NaiveDateTime.diff(inserted_at)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defmodule Realtime.Tenants.Migrations.CreateMessagesReplayIndex do
@moduledoc false

use Ecto.Migration

def change do
create_if_not_exists index(:messages, [{:desc, :inserted_at}, :topic],
where: "extension = 'broadcast' and private IS TRUE"
)
end
end
2 changes: 2 additions & 0 deletions lib/realtime_web/channels/payloads/broadcast.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ defmodule RealtimeWeb.Channels.Payloads.Broadcast do
embedded_schema do
field :ack, :boolean, default: false
field :self, :boolean, default: false
embeds_one :replay, RealtimeWeb.Channels.Payloads.Broadcast.Replay
end

def changeset(broadcast, attrs) do
cast(broadcast, attrs, [:ack, :self], message: &Join.error_message/2)
|> cast_embed(:replay, invalid_message: "unable to parse, expected a map")
end
end
17 changes: 17 additions & 0 deletions lib/realtime_web/channels/payloads/broadcast/replay.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule RealtimeWeb.Channels.Payloads.Broadcast.Replay do
@moduledoc """
Validate broadcast replay field of the join payload.
"""
use Ecto.Schema
import Ecto.Changeset
alias RealtimeWeb.Channels.Payloads.Join

embedded_schema do
field :limit, :integer, default: 10
field :since, :integer, default: 0
end

def changeset(broadcast, attrs) do
cast(broadcast, attrs, [:limit, :since], message: &Join.error_message/2)
end
end
51 changes: 49 additions & 2 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,21 @@ defmodule RealtimeWeb.RealtimeChannel do
{:ok, claims, confirm_token_ref} <- confirm_token(socket),
socket = assign_authorization_context(socket, sub_topic, claims),
{:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id),
{:ok, socket} <- maybe_assign_policies(sub_topic, db_conn, socket) do
{:ok, socket} <- maybe_assign_policies(sub_topic, db_conn, socket),
{:ok, replayed_message_ids} <-
maybe_replay_messages(params["config"], sub_topic, db_conn, socket.assigns.private?) do
tenant_topic = Tenants.tenant_topic(tenant_id, sub_topic, !socket.assigns.private?)

# fastlane subscription
metadata =
MessageDispatcher.fastlane_metadata(transport_pid, serializer, topic, socket.assigns.log_level, tenant_id)
MessageDispatcher.fastlane_metadata(
transport_pid,
serializer,
topic,
log_level,
tenant_id,
replayed_message_ids
)

RealtimeWeb.Endpoint.subscribe(tenant_topic, metadata: metadata)

Expand Down Expand Up @@ -198,13 +207,30 @@ defmodule RealtimeWeb.RealtimeChannel do
{:error, :shutdown_in_progress} ->
log_error(socket, "RealtimeRestarting", "Realtime is restarting, please standby")

{:error, :failed_to_replay_messages} ->
log_error(socket, "UnableToReplayMessages", "Realtime was unable to replay messages")

{:error, :invalid_replay_params} ->
log_error(socket, "UnableToReplayMessages", "Replay params are not valid")

{:error, error} ->
log_error(socket, "UnknownErrorOnChannel", error)
{:error, %{reason: "Unknown Error on Channel"}}
end
end

@impl true
def handle_info({:replay, messages}, socket) do
for message <- messages do
meta = %{"replayed" => true, "id" => message.id}
payload = %{"payload" => message.payload, "event" => message.event, "type" => "broadcast", "meta" => meta}

push(socket, "broadcast", payload)
end

{:noreply, socket}
end

def handle_info(:update_rate_counter, socket) do
count(socket)

Expand Down Expand Up @@ -762,4 +788,25 @@ defmodule RealtimeWeb.RealtimeChannel do
do: {:error, :private_only},
else: :ok
end

defp maybe_replay_messages(%{"broadcast" => %{"replay" => _}}, _sub_topic, _db_conn, false = _private?) do
{:error, :invalid_replay_params}
end

defp maybe_replay_messages(%{"broadcast" => %{"replay" => replay_params}}, sub_topic, db_conn, true = _private?)
when is_map(replay_params) do
with {:ok, messages, message_ids} <-
Realtime.Messages.replay(
db_conn,
sub_topic,
replay_params["since"],
replay_params["limit"] || 25
) do
# Send to self because we can't write to the socket before finishing the join process
send(self(), {:replay, messages})
{:ok, message_ids}
end
end

defp maybe_replay_messages(_, _, _, _), do: {:ok, MapSet.new()}
end
Loading
Loading