diff --git a/README.md b/README.md index 2235bf388..6a16a79ba 100644 --- a/README.md +++ b/README.md @@ -94,7 +94,7 @@ You can add your own by making a `POST` request to the server. You must change b "region": "us-west-1", "poll_interval_ms": 100, "poll_max_record_bytes": 1048576, - "ssl_enforced": false + "ssl_enforced": false } } ] @@ -284,6 +284,7 @@ This is the list of operational codes that can help you understand your deployme | UnknownErrorOnController | An error we are not handling correctly was triggered on a controller | | UnknownErrorOnChannel | An error we are not handling correctly was triggered on a channel | | PresenceRateLimitReached | Limit of presence events reached | +| UnableToReplayMessages | An error while replaying messages | ## License diff --git a/lib/realtime/api/message.ex b/lib/realtime/api/message.ex index 90ebc5bc9..18bbc9a87 100644 --- a/lib/realtime/api/message.ex +++ b/lib/realtime/api/message.ex @@ -8,6 +8,8 @@ defmodule Realtime.Api.Message do @primary_key {:id, Ecto.UUID, autogenerate: true} @schema_prefix "realtime" + @type t :: %__MODULE__{} + schema "messages" do field(:topic, :string) field(:extension, Ecto.Enum, values: [:broadcast, :presence]) @@ -39,7 +41,7 @@ defmodule Realtime.Api.Message do end defp maybe_put_timestamp(changeset, field) do - case Map.get(changeset.data, field) do + case get_field(changeset, field) do nil -> put_timestamp(changeset, field) _ -> changeset end diff --git a/lib/realtime/messages.ex b/lib/realtime/messages.ex index c6d571db7..804a48d66 100644 --- a/lib/realtime/messages.ex +++ b/lib/realtime/messages.ex @@ -3,6 +3,61 @@ defmodule Realtime.Messages do Handles `realtime.messages` table operations """ + alias Realtime.Api.Message + + import Ecto.Query, only: [from: 2] + + @hard_limit 25 + @default_timeout 5_000 + + @doc """ + Fetch last `limit ` messages for a given `topic` inserted after `since` + + Automatically uses RPC if the database connection is not in the same node + + Only allowed for private channels + """ + @spec replay(pid, String.t(), non_neg_integer, non_neg_integer) :: + {:ok, Message.t(), [String.t()]} | {:error, term} | {:error, :rpc_error, term} + def replay(conn, topic, since, limit) when node(conn) == node() and is_integer(since) and is_integer(limit) do + limit = max(min(limit, @hard_limit), 1) + + with {:ok, since} <- DateTime.from_unix(since, :millisecond), + {:ok, messages} <- messages(conn, topic, since, limit) do + {:ok, Enum.reverse(messages), MapSet.new(messages, & &1.id)} + else + {:error, :postgrex_exception} -> {:error, :failed_to_replay_messages} + {:error, :invalid_unix_time} -> {:error, :invalid_replay_params} + error -> error + end + end + + def replay(conn, topic, since, limit) when is_integer(since) and is_integer(limit) do + Realtime.GenRpc.call(node(conn), __MODULE__, :replay, [conn, topic, since, limit], key: topic) + end + + def replay(_, _, _, _), do: {:error, :invalid_replay_params} + + defp messages(conn, topic, since, limit) do + since = DateTime.to_naive(since) + # We want to avoid searching partitions in the future as they should be empty + # so we limit to 1 minute in the future to account for any potential drift + now = NaiveDateTime.utc_now() |> NaiveDateTime.add(1, :minute) + + query = + from m in Message, + where: + m.topic == ^topic and + m.private == true and + m.extension == :broadcast and + m.inserted_at >= ^since and + m.inserted_at < ^now, + limit: ^limit, + order_by: [desc: m.inserted_at] + + Realtime.Repo.all(conn, query, Message, timeout: @default_timeout) + end + @doc """ Deletes messages older than 72 hours for a given tenant connection """ diff --git a/lib/realtime/tenants/batch_broadcast.ex b/lib/realtime/tenants/batch_broadcast.ex index 4fc31aa0f..98427621b 100644 --- a/lib/realtime/tenants/batch_broadcast.ex +++ b/lib/realtime/tenants/batch_broadcast.ex @@ -29,7 +29,9 @@ defmodule Realtime.Tenants.BatchBroadcast do @spec broadcast( auth_params :: map() | nil, tenant :: Tenant.t(), - messages :: %{messages: list(%{topic: String.t(), payload: map(), event: String.t(), private: boolean()})}, + messages :: %{ + messages: list(%{id: String.t(), topic: String.t(), payload: map(), event: String.t(), private: boolean()}) + }, super_user :: boolean() ) :: :ok | {:error, atom()} def broadcast(auth_params, tenant, messages, super_user \\ false) @@ -59,8 +61,8 @@ defmodule Realtime.Tenants.BatchBroadcast do # Handle events for public channel events |> Map.get(false, []) - |> Enum.each(fn %{topic: sub_topic, payload: payload, event: event} -> - send_message_and_count(tenant, events_per_second_rate, sub_topic, event, payload, true) + |> Enum.each(fn message -> + send_message_and_count(tenant, events_per_second_rate, message, true) end) # Handle events for private channel @@ -69,14 +71,14 @@ defmodule Realtime.Tenants.BatchBroadcast do |> Enum.group_by(fn event -> Map.get(event, :topic) end) |> Enum.each(fn {topic, events} -> if super_user do - Enum.each(events, fn %{topic: sub_topic, payload: payload, event: event} -> - send_message_and_count(tenant, events_per_second_rate, sub_topic, event, payload, false) + Enum.each(events, fn message -> + send_message_and_count(tenant, events_per_second_rate, message, false) end) else case permissions_for_message(tenant, auth_params, topic) do %Policies{broadcast: %BroadcastPolicies{write: true}} -> - Enum.each(events, fn %{topic: sub_topic, payload: payload, event: event} -> - send_message_and_count(tenant, events_per_second_rate, sub_topic, event, payload, false) + Enum.each(events, fn message -> + send_message_and_count(tenant, events_per_second_rate, message, false) end) _ -> @@ -91,15 +93,15 @@ defmodule Realtime.Tenants.BatchBroadcast do def broadcast(_, nil, _, _), do: {:error, :tenant_not_found} - def changeset(payload, attrs) do + defp changeset(payload, attrs) do payload |> cast(attrs, []) |> cast_embed(:messages, required: true, with: &message_changeset/2) end - def message_changeset(message, attrs) do + defp message_changeset(message, attrs) do message - |> cast(attrs, [:topic, :payload, :event, :private]) + |> cast(attrs, [:id, :topic, :payload, :event, :private]) |> maybe_put_private_change() |> validate_required([:topic, :payload, :event]) end @@ -112,11 +114,19 @@ defmodule Realtime.Tenants.BatchBroadcast do end @event_type "broadcast" - defp send_message_and_count(tenant, events_per_second_rate, topic, event, payload, public?) do - tenant_topic = Tenants.tenant_topic(tenant, topic, public?) - payload = %{"payload" => payload, "event" => event, "type" => "broadcast"} + defp send_message_and_count(tenant, events_per_second_rate, message, public?) do + tenant_topic = Tenants.tenant_topic(tenant, message.topic, public?) - broadcast = %Phoenix.Socket.Broadcast{topic: topic, event: @event_type, payload: payload} + payload = %{"payload" => message.payload, "event" => message.event, "type" => "broadcast"} + + payload = + if message[:id] do + Map.put(payload, "meta", %{"id" => message.id}) + else + payload + end + + broadcast = %Phoenix.Socket.Broadcast{topic: message.topic, event: @event_type, payload: payload} GenCounter.add(events_per_second_rate.id) TenantBroadcaster.pubsub_broadcast(tenant.external_id, tenant_topic, broadcast, RealtimeChannel.MessageDispatcher) diff --git a/lib/realtime/tenants/migrations.ex b/lib/realtime/tenants/migrations.ex index 04475c2b7..a5fa1eb8b 100644 --- a/lib/realtime/tenants/migrations.ex +++ b/lib/realtime/tenants/migrations.ex @@ -74,7 +74,8 @@ defmodule Realtime.Tenants.Migrations do RealtimeSendSetsTopicConfig, SubscriptionIndexBridgingDisabled, RunSubscriptionIndexBridgingDisabled, - BroadcastSendErrorLogging + BroadcastSendErrorLogging, + CreateMessagesReplayIndex } @migrations [ @@ -140,7 +141,8 @@ defmodule Realtime.Tenants.Migrations do {20_250_128_220_012, RealtimeSendSetsTopicConfig}, {20_250_506_224_012, SubscriptionIndexBridgingDisabled}, {20_250_523_164_012, RunSubscriptionIndexBridgingDisabled}, - {20_250_714_121_412, BroadcastSendErrorLogging} + {20_250_714_121_412, BroadcastSendErrorLogging}, + {20_250_905_041_441, CreateMessagesReplayIndex} ] defstruct [:tenant_external_id, :settings] diff --git a/lib/realtime/tenants/replication_connection.ex b/lib/realtime/tenants/replication_connection.ex index 58b1de191..4ebb1f8e8 100644 --- a/lib/realtime/tenants/replication_connection.ex +++ b/lib/realtime/tenants/replication_connection.ex @@ -310,7 +310,13 @@ defmodule Realtime.Tenants.ReplicationConnection do {:ok, topic} <- get_or_error(to_broadcast, "topic", :topic_missing), {:ok, private} <- get_or_error(to_broadcast, "private", :private_missing), %Tenant{} = tenant <- Cache.get_tenant_by_external_id(tenant_id), - broadcast_message = %{topic: topic, event: event, private: private, payload: Map.put_new(payload, "id", id)}, + broadcast_message = %{ + id: id, + topic: topic, + event: event, + private: private, + payload: Map.put_new(payload, "id", id) + }, :ok <- BatchBroadcast.broadcast(nil, tenant, %{messages: [broadcast_message]}, true) do inserted_at = NaiveDateTime.from_iso8601!(inserted_at) latency_inserted_at = NaiveDateTime.utc_now() |> NaiveDateTime.diff(inserted_at) diff --git a/lib/realtime/tenants/repo/migrations/20250905041441_create_messages_replay_index.ex b/lib/realtime/tenants/repo/migrations/20250905041441_create_messages_replay_index.ex new file mode 100644 index 000000000..77afde6e0 --- /dev/null +++ b/lib/realtime/tenants/repo/migrations/20250905041441_create_messages_replay_index.ex @@ -0,0 +1,11 @@ +defmodule Realtime.Tenants.Migrations.CreateMessagesReplayIndex do + @moduledoc false + + use Ecto.Migration + + def change do + create_if_not_exists index(:messages, [{:desc, :inserted_at}, :topic], + where: "extension = 'broadcast' and private IS TRUE" + ) + end +end diff --git a/lib/realtime_web/channels/payloads/broadcast.ex b/lib/realtime_web/channels/payloads/broadcast.ex index 7feddb043..e2881fd54 100644 --- a/lib/realtime_web/channels/payloads/broadcast.ex +++ b/lib/realtime_web/channels/payloads/broadcast.ex @@ -9,9 +9,11 @@ defmodule RealtimeWeb.Channels.Payloads.Broadcast do embedded_schema do field :ack, :boolean, default: false field :self, :boolean, default: false + embeds_one :replay, RealtimeWeb.Channels.Payloads.Broadcast.Replay end def changeset(broadcast, attrs) do cast(broadcast, attrs, [:ack, :self], message: &Join.error_message/2) + |> cast_embed(:replay, invalid_message: "unable to parse, expected a map") end end diff --git a/lib/realtime_web/channels/payloads/broadcast/replay.ex b/lib/realtime_web/channels/payloads/broadcast/replay.ex new file mode 100644 index 000000000..b0a5804a2 --- /dev/null +++ b/lib/realtime_web/channels/payloads/broadcast/replay.ex @@ -0,0 +1,17 @@ +defmodule RealtimeWeb.Channels.Payloads.Broadcast.Replay do + @moduledoc """ + Validate broadcast replay field of the join payload. + """ + use Ecto.Schema + import Ecto.Changeset + alias RealtimeWeb.Channels.Payloads.Join + + embedded_schema do + field :limit, :integer, default: 10 + field :since, :integer, default: 0 + end + + def changeset(broadcast, attrs) do + cast(broadcast, attrs, [:limit, :since], message: &Join.error_message/2) + end +end diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index 03bd91347..1d58d9da7 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -72,12 +72,21 @@ defmodule RealtimeWeb.RealtimeChannel do {:ok, claims, confirm_token_ref} <- confirm_token(socket), socket = assign_authorization_context(socket, sub_topic, claims), {:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id), - {:ok, socket} <- maybe_assign_policies(sub_topic, db_conn, socket) do + {:ok, socket} <- maybe_assign_policies(sub_topic, db_conn, socket), + {:ok, replayed_message_ids} <- + maybe_replay_messages(params["config"], sub_topic, db_conn, socket.assigns.private?) do tenant_topic = Tenants.tenant_topic(tenant_id, sub_topic, !socket.assigns.private?) # fastlane subscription metadata = - MessageDispatcher.fastlane_metadata(transport_pid, serializer, topic, socket.assigns.log_level, tenant_id) + MessageDispatcher.fastlane_metadata( + transport_pid, + serializer, + topic, + log_level, + tenant_id, + replayed_message_ids + ) RealtimeWeb.Endpoint.subscribe(tenant_topic, metadata: metadata) @@ -198,6 +207,12 @@ defmodule RealtimeWeb.RealtimeChannel do {:error, :shutdown_in_progress} -> log_error(socket, "RealtimeRestarting", "Realtime is restarting, please standby") + {:error, :failed_to_replay_messages} -> + log_error(socket, "UnableToReplayMessages", "Realtime was unable to replay messages") + + {:error, :invalid_replay_params} -> + log_error(socket, "UnableToReplayMessages", "Replay params are not valid") + {:error, error} -> log_error(socket, "UnknownErrorOnChannel", error) {:error, %{reason: "Unknown Error on Channel"}} @@ -205,6 +220,17 @@ defmodule RealtimeWeb.RealtimeChannel do end @impl true + def handle_info({:replay, messages}, socket) do + for message <- messages do + meta = %{"replayed" => true, "id" => message.id} + payload = %{"payload" => message.payload, "event" => message.event, "type" => "broadcast", "meta" => meta} + + push(socket, "broadcast", payload) + end + + {:noreply, socket} + end + def handle_info(:update_rate_counter, socket) do count(socket) @@ -762,4 +788,25 @@ defmodule RealtimeWeb.RealtimeChannel do do: {:error, :private_only}, else: :ok end + + defp maybe_replay_messages(%{"broadcast" => %{"replay" => _}}, _sub_topic, _db_conn, false = _private?) do + {:error, :invalid_replay_params} + end + + defp maybe_replay_messages(%{"broadcast" => %{"replay" => replay_params}}, sub_topic, db_conn, true = _private?) + when is_map(replay_params) do + with {:ok, messages, message_ids} <- + Realtime.Messages.replay( + db_conn, + sub_topic, + replay_params["since"], + replay_params["limit"] || 25 + ) do + # Send to self because we can't write to the socket before finishing the join process + send(self(), {:replay, messages}) + {:ok, message_ids} + end + end + + defp maybe_replay_messages(_, _, _, _), do: {:ok, MapSet.new()} end diff --git a/lib/realtime_web/channels/realtime_channel/message_dispatcher.ex b/lib/realtime_web/channels/realtime_channel/message_dispatcher.ex index b5db97f95..ef486c4e8 100644 --- a/lib/realtime_web/channels/realtime_channel/message_dispatcher.ex +++ b/lib/realtime_web/channels/realtime_channel/message_dispatcher.ex @@ -5,12 +5,14 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do require Logger - def fastlane_metadata(fastlane_pid, serializer, topic, :info, tenant_id) do - {:realtime_channel_fastlane, fastlane_pid, serializer, topic, {:log, tenant_id}} + def fastlane_metadata(fastlane_pid, serializer, topic, log_level, tenant_id, replayed_message_ids \\ MapSet.new()) + + def fastlane_metadata(fastlane_pid, serializer, topic, :info, tenant_id, replayed_message_ids) do + {:rc_fastlane, fastlane_pid, serializer, topic, {:log, tenant_id}, replayed_message_ids} end - def fastlane_metadata(fastlane_pid, serializer, topic, _log_level, _tenant_id) do - {:realtime_channel_fastlane, fastlane_pid, serializer, topic} + def fastlane_metadata(fastlane_pid, serializer, topic, _log_level, _tenant_id, replayed_message_ids) do + {:rc_fastlane, fastlane_pid, serializer, topic, replayed_message_ids} end @doc """ @@ -23,22 +25,34 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do # This reduce caches the serialization and bypasses the channel process going straight to the # transport process + message_id = msg.payload["meta"]["id"] + # Credo doesn't like that we don't use the result aggregation _ = Enum.reduce(subscribers, %{}, fn {pid, _}, cache when pid == from -> cache - {pid, {:realtime_channel_fastlane, fastlane_pid, serializer, join_topic}}, cache -> - send(pid, :update_rate_counter) - do_dispatch(msg, fastlane_pid, serializer, join_topic, cache) + {pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, replayed_message_ids}}, cache -> + if already_replayed?(message_id, replayed_message_ids) do + # skip already replayed message + cache + else + send(pid, :update_rate_counter) + do_dispatch(msg, fastlane_pid, serializer, join_topic, cache) + end - {pid, {:realtime_channel_fastlane, fastlane_pid, serializer, join_topic, {:log, tenant_id}}}, cache -> - send(pid, :update_rate_counter) - log = "Received message on #{join_topic} with payload: #{inspect(msg, pretty: true)}" - Logger.info(log, external_id: tenant_id, project: tenant_id) + {pid, {:rc_fastlane, fastlane_pid, serializer, join_topic, {:log, tenant_id}, replayed_message_ids}}, cache -> + if already_replayed?(message_id, replayed_message_ids) do + # skip already replayed message + cache + else + send(pid, :update_rate_counter) + log = "Received message on #{join_topic} with payload: #{inspect(msg, pretty: true)}" + Logger.info(log, external_id: tenant_id, project: tenant_id) - do_dispatch(msg, fastlane_pid, serializer, join_topic, cache) + do_dispatch(msg, fastlane_pid, serializer, join_topic, cache) + end {pid, _}, cache -> send(pid, msg) @@ -48,6 +62,9 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcher do :ok end + defp already_replayed?(nil, _replayed_message_ids), do: false + defp already_replayed?(message_id, replayed_message_ids), do: MapSet.member?(replayed_message_ids, message_id) + defp do_dispatch(msg, fastlane_pid, serializer, join_topic, cache) do case cache do %{^serializer => encoded_msg} -> diff --git a/mix.exs b/mix.exs index 372ff12c4..1e17ec551 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.48.2", + version: "2.49.0", elixir: "~> 1.17.3", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/realtime/messages_test.exs b/test/realtime/messages_test.exs index 3bef9a5e0..cca0ce742 100644 --- a/test/realtime/messages_test.exs +++ b/test/realtime/messages_test.exs @@ -16,32 +16,221 @@ defmodule Realtime.MessagesTest do %{conn: conn, tenant: tenant, date_start: date_start, date_end: date_end} end - test "delete_old_messages/1 deletes messages older than 72 hours", %{ - conn: conn, - tenant: tenant, - date_start: date_start, - date_end: date_end - } do - utc_now = NaiveDateTime.utc_now() - limit = NaiveDateTime.add(utc_now, -72, :hour) - - messages = - for date <- Date.range(date_start, date_end) do - inserted_at = date |> NaiveDateTime.new!(Time.new!(0, 0, 0)) - message_fixture(tenant, %{inserted_at: inserted_at}) + describe "replay/5" do + test "invalid replay params" do + assert Messages.replay(self(), "a topic", "not a number", 123) == + {:error, :invalid_replay_params} + + assert Messages.replay(self(), "a topic", 123, "not a number") == + {:error, :invalid_replay_params} + + assert Messages.replay(self(), "a topic", 253_402_300_800_000, 10) == + {:error, :invalid_replay_params} + end + + test "empty replay", %{conn: conn} do + assert Messages.replay(conn, "test", 0, 10) == {:ok, [], MapSet.new()} + end + + test "replay respects limit", %{conn: conn, tenant: tenant} do + m1 = + message_fixture(tenant, %{ + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-1, :minute), + "event" => "new", + "extension" => "broadcast", + "topic" => "test", + "private" => true, + "payload" => %{"value" => "new"} + }) + + message_fixture(tenant, %{ + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-2, :minute), + "event" => "old", + "extension" => "broadcast", + "topic" => "test", + "private" => true, + "payload" => %{"value" => "old"} + }) + + assert Messages.replay(conn, "test", 0, 1) == {:ok, [m1], MapSet.new([m1.id])} + end + + test "replay private topic only", %{conn: conn, tenant: tenant} do + privatem = + message_fixture(tenant, %{ + "private" => true, + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-1, :minute), + "event" => "new", + "extension" => "broadcast", + "topic" => "test", + "payload" => %{"value" => "new"} + }) + + message_fixture(tenant, %{ + "private" => false, + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-2, :minute), + "event" => "old", + "extension" => "broadcast", + "topic" => "test", + "payload" => %{"value" => "old"} + }) + + assert Messages.replay(conn, "test", 0, 10) == {:ok, [privatem], MapSet.new([privatem.id])} + end + + test "replay extension=broadcast", %{conn: conn, tenant: tenant} do + privatem = + message_fixture(tenant, %{ + "private" => true, + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-1, :minute), + "event" => "new", + "extension" => "broadcast", + "topic" => "test", + "payload" => %{"value" => "new"} + }) + + message_fixture(tenant, %{ + "private" => true, + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-2, :minute), + "event" => "old", + "extension" => "presence", + "topic" => "test", + "payload" => %{"value" => "old"} + }) + + assert Messages.replay(conn, "test", 0, 10) == {:ok, [privatem], MapSet.new([privatem.id])} + end + + test "replay respects since", %{conn: conn, tenant: tenant} do + m1 = + message_fixture(tenant, %{ + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-2, :minute), + "event" => "first", + "extension" => "broadcast", + "topic" => "test", + "private" => true, + "payload" => %{"value" => "first"} + }) + + m2 = + message_fixture(tenant, %{ + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-1, :minute), + "event" => "second", + "extension" => "broadcast", + "topic" => "test", + "private" => true, + "payload" => %{"value" => "second"} + }) + + message_fixture(tenant, %{ + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-10, :minute), + "event" => "old", + "extension" => "broadcast", + "topic" => "test", + "private" => true, + "payload" => %{"value" => "old"} + }) + + since = DateTime.utc_now() |> DateTime.add(-3, :minute) |> DateTime.to_unix(:millisecond) + + assert Messages.replay(conn, "test", since, 10) == {:ok, [m1, m2], MapSet.new([m1.id, m2.id])} + end + + test "replay respects hard max limit of 25", %{conn: conn, tenant: tenant} do + for _i <- 1..30 do + message_fixture(tenant, %{ + "inserted_at" => NaiveDateTime.utc_now(), + "event" => "event", + "extension" => "broadcast", + "topic" => "test", + "private" => true, + "payload" => %{"value" => "message"} + }) end - assert length(messages) == 11 + assert {:ok, messages, set} = Messages.replay(conn, "test", 0, 30) + assert length(messages) == 25 + assert MapSet.size(set) == 25 + end + + test "replay respects hard min limit of 1", %{conn: conn, tenant: tenant} do + message_fixture(tenant, %{ + "inserted_at" => NaiveDateTime.utc_now(), + "event" => "event", + "extension" => "broadcast", + "topic" => "test", + "private" => true, + "payload" => %{"value" => "message"} + }) + + assert {:ok, messages, set} = Messages.replay(conn, "test", 0, 0) + assert length(messages) == 1 + assert MapSet.size(set) == 1 + end + + test "distributed replay", %{conn: conn, tenant: tenant} do + m = + message_fixture(tenant, %{ + "inserted_at" => NaiveDateTime.utc_now(), + "event" => "event", + "extension" => "broadcast", + "topic" => "test", + "private" => true, + "payload" => %{"value" => "message"} + }) + + {:ok, node} = Clustered.start() + + # Call remote node passing the database connection that is local to this node + assert :erpc.call(node, Messages, :replay, [conn, "test", 0, 30]) == {:ok, [m], MapSet.new([m.id])} + end + + test "distributed replay error", %{tenant: tenant} do + message_fixture(tenant, %{ + "inserted_at" => NaiveDateTime.utc_now(), + "event" => "event", + "extension" => "broadcast", + "topic" => "test", + "private" => true, + "payload" => %{"value" => "message"} + }) + + {:ok, node} = Clustered.start() + + # Call remote node passing the database connection that is local to this node + pid = spawn(fn -> :ok end) + assert :erpc.call(node, Messages, :replay, [pid, "test", 0, 30]) == {:error, :failed_to_replay_messages} + end + end + + describe "delete_old_messages/1" do + test "delete_old_messages/1 deletes messages older than 72 hours", %{ + conn: conn, + tenant: tenant, + date_start: date_start, + date_end: date_end + } do + utc_now = NaiveDateTime.utc_now() + limit = NaiveDateTime.add(utc_now, -72, :hour) + + messages = + for date <- Date.range(date_start, date_end) do + inserted_at = date |> NaiveDateTime.new!(Time.new!(0, 0, 0)) + message_fixture(tenant, %{inserted_at: inserted_at}) + end + + assert length(messages) == 11 - to_keep = - Enum.reject( - messages, - &(NaiveDateTime.compare(limit, &1.inserted_at) == :gt) - ) + to_keep = + Enum.reject( + messages, + &(NaiveDateTime.compare(NaiveDateTime.beginning_of_day(limit), &1.inserted_at) == :gt) + ) - assert :ok = Messages.delete_old_messages(conn) - {:ok, current} = Repo.all(conn, from(m in Message), Message) + assert :ok = Messages.delete_old_messages(conn) + {:ok, current} = Repo.all(conn, from(m in Message), Message) - assert Enum.sort(current) == Enum.sort(to_keep) + assert Enum.sort(current) == Enum.sort(to_keep) + end end end diff --git a/test/realtime/tenants/janitor/maintenance_task_test.exs b/test/realtime/tenants/janitor/maintenance_task_test.exs index f4c51436e..4c42b7ab3 100644 --- a/test/realtime/tenants/janitor/maintenance_task_test.exs +++ b/test/realtime/tenants/janitor/maintenance_task_test.exs @@ -15,9 +15,15 @@ defmodule Realtime.Tenants.Janitor.MaintenanceTaskTest do end test "cleans messages older than 72 hours and creates partitions", %{tenant: tenant} do + {:ok, conn} = Database.connect(tenant, "realtime_test", :stop) + utc_now = NaiveDateTime.utc_now() limit = NaiveDateTime.add(utc_now, -72, :hour) + date_start = Date.utc_today() |> Date.add(-10) + date_end = Date.utc_today() + create_messages_partitions(conn, date_start, date_end) + messages = for days <- -5..0 do inserted_at = NaiveDateTime.add(utc_now, days, :day) @@ -27,12 +33,11 @@ defmodule Realtime.Tenants.Janitor.MaintenanceTaskTest do to_keep = messages - |> Enum.reject(&(NaiveDateTime.compare(limit, &1.inserted_at) == :gt)) + |> Enum.reject(&(NaiveDateTime.compare(NaiveDateTime.beginning_of_day(limit), &1.inserted_at) == :gt)) |> MapSet.new() assert MaintenanceTask.run(tenant.external_id) == :ok - {:ok, conn} = Database.connect(tenant, "realtime_test", :stop) {:ok, res} = Repo.all(conn, from(m in Message), Message) verify_partitions(conn) @@ -80,7 +85,7 @@ defmodule Realtime.Tenants.Janitor.MaintenanceTaskTest do defp verify_partitions(conn) do today = Date.utc_today() - yesterday = Date.add(today, -1) + yesterday = Date.add(today, -3) future = Date.add(today, 3) dates = Date.range(yesterday, future) diff --git a/test/realtime/tenants/janitor_test.exs b/test/realtime/tenants/janitor_test.exs index 4ac1a0eda..fb597a4c4 100644 --- a/test/realtime/tenants/janitor_test.exs +++ b/test/realtime/tenants/janitor_test.exs @@ -31,6 +31,14 @@ defmodule Realtime.Tenants.JanitorTest do end ) + date_start = Date.utc_today() |> Date.add(-10) + date_end = Date.utc_today() + + Enum.map(tenants, fn tenant -> + {:ok, conn} = Database.connect(tenant, "realtime_test", :stop) + create_messages_partitions(conn, date_start, date_end) + end) + start_supervised!( {Task.Supervisor, name: Realtime.Tenants.Janitor.TaskSupervisor, max_children: 5, max_seconds: 500, max_restarts: 1} @@ -62,7 +70,7 @@ defmodule Realtime.Tenants.JanitorTest do to_keep = messages - |> Enum.reject(&(NaiveDateTime.compare(limit, &1.inserted_at) == :gt)) + |> Enum.reject(&(NaiveDateTime.compare(NaiveDateTime.beginning_of_day(limit), &1.inserted_at) == :gt)) |> MapSet.new() start_supervised!(Janitor) @@ -105,7 +113,7 @@ defmodule Realtime.Tenants.JanitorTest do to_keep = messages - |> Enum.reject(&(NaiveDateTime.compare(limit, &1.inserted_at) == :gt)) + |> Enum.reject(&(NaiveDateTime.compare(NaiveDateTime.beginning_of_day(limit), &1.inserted_at) == :gt)) |> MapSet.new() start_supervised!(Janitor) @@ -162,7 +170,7 @@ defmodule Realtime.Tenants.JanitorTest do defp verify_partitions(conn) do today = Date.utc_today() - yesterday = Date.add(today, -1) + yesterday = Date.add(today, -3) future = Date.add(today, 3) dates = Date.range(yesterday, future) diff --git a/test/realtime/tenants/replication_connection_test.exs b/test/realtime/tenants/replication_connection_test.exs index 2d367a846..b28a23988 100644 --- a/test/realtime/tenants/replication_connection_test.exs +++ b/test/realtime/tenants/replication_connection_test.exs @@ -98,6 +98,7 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do payload = %{ "event" => "INSERT", + "meta" => %{"id" => row.id}, "payload" => %{ "id" => row.id, "value" => value @@ -139,8 +140,9 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do "event" => "broadcast", "payload" => %{ "event" => "INSERT", + "meta" => %{"id" => id}, "payload" => %{ - "id" => _, + "id" => id, "value" => ^value } }, @@ -222,21 +224,26 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do "payload" => %{"value" => "something"} }) + fixture_id = fixture.id + assert_receive {:socket_push, :text, data}, 500 message = data |> IO.iodata_to_binary() |> Jason.decode!() assert %{ "event" => "broadcast", - "payload" => %{"event" => "INSERT", "payload" => payload, "type" => "broadcast"}, + "payload" => %{ + "event" => "INSERT", + "meta" => %{"id" => ^fixture_id}, + "payload" => payload, + "type" => "broadcast" + }, "ref" => nil, "topic" => ^topic } = message - id = fixture.id - assert payload == %{ "value" => "something", - "id" => id + "id" => fixture_id } end @@ -252,19 +259,25 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do payload = %{"value" => "something", "id" => "123456"} - message_fixture(tenant, %{ - "topic" => topic, - "private" => true, - "event" => "INSERT", - "payload" => payload - }) + %{id: fixture_id} = + message_fixture(tenant, %{ + "topic" => topic, + "private" => true, + "event" => "INSERT", + "payload" => payload + }) assert_receive {:socket_push, :text, data}, 500 message = data |> IO.iodata_to_binary() |> Jason.decode!() assert %{ "event" => "broadcast", - "payload" => %{"event" => "INSERT", "payload" => ^payload, "type" => "broadcast"}, + "payload" => %{ + "meta" => %{"id" => ^fixture_id}, + "event" => "INSERT", + "payload" => ^payload, + "type" => "broadcast" + }, "ref" => nil, "topic" => ^topic } = message diff --git a/test/realtime_web/channels/payloads/join_test.exs b/test/realtime_web/channels/payloads/join_test.exs index 32bf1b397..c1ea54a67 100644 --- a/test/realtime_web/channels/payloads/join_test.exs +++ b/test/realtime_web/channels/payloads/join_test.exs @@ -6,6 +6,7 @@ defmodule RealtimeWeb.Channels.Payloads.JoinTest do alias RealtimeWeb.Channels.Payloads.Join alias RealtimeWeb.Channels.Payloads.Config alias RealtimeWeb.Channels.Payloads.Broadcast + alias RealtimeWeb.Channels.Payloads.Broadcast.Replay alias RealtimeWeb.Channels.Payloads.Presence alias RealtimeWeb.Channels.Payloads.PostgresChange @@ -17,7 +18,7 @@ defmodule RealtimeWeb.Channels.Payloads.JoinTest do config = %{ "config" => %{ "private" => false, - "broadcast" => %{"ack" => false, "self" => false}, + "broadcast" => %{"ack" => false, "self" => false, "replay" => %{"since" => 1, "limit" => 10}}, "presence" => %{"enabled" => true, "key" => key}, "postgres_changes" => [ %{"event" => "INSERT", "schema" => "public", "table" => "users", "filter" => "id=eq.1"}, @@ -37,8 +38,9 @@ defmodule RealtimeWeb.Channels.Payloads.JoinTest do postgres_changes: postgres_changes } = config - assert %Broadcast{ack: false, self: false} = broadcast + assert %Broadcast{ack: false, self: false, replay: replay} = broadcast assert %Presence{enabled: true, key: ^key} = presence + assert %Replay{since: 1, limit: 10} = replay assert [ %PostgresChange{event: "INSERT", schema: "public", table: "users", filter: "id=eq.1"}, @@ -56,6 +58,17 @@ defmodule RealtimeWeb.Channels.Payloads.JoinTest do assert is_binary(key) end + test "invalid replay" do + config = %{"config" => %{"broadcast" => %{"replay" => 123}}} + + assert { + :error, + :invalid_join_payload, + %{config: %{broadcast: %{replay: ["unable to parse, expected a map"]}}} + } = + Join.validate(config) + end + test "missing enabled presence defaults to true" do config = %{"config" => %{"presence" => %{}}} diff --git a/test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs b/test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs index 7a9e2eb25..91b16c089 100644 --- a/test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs +++ b/test/realtime_web/channels/realtime_channel/message_dispatcher_test.exs @@ -16,12 +16,12 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do describe "fastlane_metadata/5" do test "info level" do assert MessageDispatcher.fastlane_metadata(self(), Serializer, "realtime:topic", :info, "tenant_id") == - {:realtime_channel_fastlane, self(), Serializer, "realtime:topic", {:log, "tenant_id"}} + {:rc_fastlane, self(), Serializer, "realtime:topic", {:log, "tenant_id"}, MapSet.new()} end test "non-info level" do assert MessageDispatcher.fastlane_metadata(self(), Serializer, "realtime:topic", :warning, "tenant_id") == - {:realtime_channel_fastlane, self(), Serializer, "realtime:topic"} + {:rc_fastlane, self(), Serializer, "realtime:topic", MapSet.new()} end end @@ -50,12 +50,11 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do from_pid = :erlang.list_to_pid(~c'<0.2.1>') subscribers = [ - {subscriber_pid, {:realtime_channel_fastlane, self(), TestSerializer, "realtime:topic", {:log, "tenant123"}}}, - {subscriber_pid, {:realtime_channel_fastlane, self(), TestSerializer, "realtime:topic"}} + {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", {:log, "tenant123"}, MapSet.new()}}, + {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", MapSet.new()}} ] msg = %Broadcast{topic: "some:other:topic", event: "event", payload: %{data: "test"}} - require Logger log = capture_log(fn -> @@ -75,6 +74,44 @@ defmodule RealtimeWeb.RealtimeChannel.MessageDispatcherTest do refute_receive _any end + test "does not dispatch messages to fastlane subscribers if they already replayed it" do + parent = self() + + subscriber_pid = + spawn(fn -> + loop = fn loop -> + receive do + msg -> + send(parent, {:subscriber, msg}) + loop.(loop) + end + end + + loop.(loop) + end) + + from_pid = :erlang.list_to_pid(~c'<0.2.1>') + replaeyd_message_ids = MapSet.new(["123"]) + + subscribers = [ + {subscriber_pid, + {:rc_fastlane, self(), TestSerializer, "realtime:topic", {:log, "tenant123"}, replaeyd_message_ids}}, + {subscriber_pid, {:rc_fastlane, self(), TestSerializer, "realtime:topic", replaeyd_message_ids}} + ] + + msg = %Broadcast{ + topic: "some:other:topic", + event: "event", + payload: %{"data" => "test", "meta" => %{"id" => "123"}} + } + + assert MessageDispatcher.dispatch(subscribers, from_pid, msg) == :ok + + assert Agent.get(TestSerializer, & &1) == 0 + + refute_receive _any + end + test "dispatches messages to non fastlane subscribers" do from_pid = :erlang.list_to_pid(~c'<0.2.1>') diff --git a/test/realtime_web/channels/realtime_channel_test.exs b/test/realtime_web/channels/realtime_channel_test.exs index 2dff83da3..4d90c3588 100644 --- a/test/realtime_web/channels/realtime_channel_test.exs +++ b/test/realtime_web/channels/realtime_channel_test.exs @@ -28,6 +28,168 @@ defmodule RealtimeWeb.RealtimeChannelTest do setup :rls_context + describe "broadcast" do + @describetag policies: [:authenticated_all_topic_read] + + test "wrong replay params", %{tenant: tenant} do + jwt = Generators.generate_jwt_token(tenant) + {:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt)) + + config = %{ + "private" => true, + "broadcast" => %{ + "replay" => %{"limit" => "not a number", "since" => :erlang.system_time(:millisecond) - 5 * 60000} + } + } + + assert {:error, %{reason: "UnableToReplayMessages: Replay params are not valid"}} = + subscribe_and_join(socket, "realtime:test", %{"config" => config}) + + config = %{ + "private" => true, + "broadcast" => %{ + "replay" => %{"limit" => 1, "since" => "not a number"} + } + } + + assert {:error, %{reason: "UnableToReplayMessages: Replay params are not valid"}} = + subscribe_and_join(socket, "realtime:test", %{"config" => config}) + + config = %{ + "private" => true, + "broadcast" => %{ + "replay" => %{} + } + } + + assert {:error, %{reason: "UnableToReplayMessages: Replay params are not valid"}} = + subscribe_and_join(socket, "realtime:test", %{"config" => config}) + end + + test "failure to replay", %{tenant: tenant} do + jwt = Generators.generate_jwt_token(tenant) + {:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt)) + + config = %{ + "private" => true, + "broadcast" => %{ + "replay" => %{"limit" => 12, "since" => :erlang.system_time(:millisecond) - 5 * 60000} + } + } + + Authorization + |> expect(:get_read_authorizations, fn _, _, _ -> + {:ok, + %Authorization.Policies{ + broadcast: %Authorization.Policies.BroadcastPolicies{read: true, write: nil} + }} + end) + + # Broken database connection + conn = spawn(fn -> :ok end) + Connect.lookup_or_start_connection(tenant.external_id) + {:ok, _} = :syn.update_registry(Connect, tenant.external_id, fn _pid, meta -> %{meta | conn: conn} end) + + assert {:error, %{reason: "UnableToReplayMessages: Realtime was unable to replay messages"}} = + subscribe_and_join(socket, "realtime:test", %{"config" => config}) + end + + test "replay messages on public topic not allowed", %{tenant: tenant} do + jwt = Generators.generate_jwt_token(tenant) + {:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt)) + + config = %{ + "presence" => %{"enabled" => false}, + "broadcast" => %{"replay" => %{"limit" => 2, "since" => :erlang.system_time(:millisecond) - 5 * 60000}} + } + + assert { + :error, + %{reason: "UnableToReplayMessages: Replay params are not valid"} + } = subscribe_and_join(socket, "realtime:test", %{"config" => config}) + + refute_receive _any + end + + @tag policies: [:authenticated_all_topic_read] + test "replay messages on private topic", %{tenant: tenant} do + jwt = Generators.generate_jwt_token(tenant) + {:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant, jwt)) + + # Old message + message_fixture(tenant, %{ + "private" => true, + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-1, :day), + "event" => "old", + "extension" => "broadcast", + "topic" => "test", + "payload" => %{"value" => "old"} + }) + + %{id: message1_id} = + message_fixture(tenant, %{ + "private" => true, + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-1, :minute), + "event" => "first", + "extension" => "broadcast", + "topic" => "test", + "payload" => %{"value" => "first"} + }) + + %{id: message2_id} = + message_fixture(tenant, %{ + "private" => true, + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-2, :minute), + "event" => "second", + "extension" => "broadcast", + "topic" => "test", + "payload" => %{"value" => "second"} + }) + + # This one should not be received because of the limit + message_fixture(tenant, %{ + "private" => true, + "inserted_at" => NaiveDateTime.utc_now() |> NaiveDateTime.add(-3, :minute), + "event" => "third", + "extension" => "broadcast", + "topic" => "test", + "payload" => %{"value" => "third"} + }) + + config = %{ + "private" => true, + "presence" => %{"enabled" => false}, + "broadcast" => %{"replay" => %{"limit" => 2, "since" => :erlang.system_time(:millisecond) - 5 * 60000}} + } + + assert {:ok, _, %Socket{}} = subscribe_and_join(socket, "realtime:test", %{"config" => config}) + + assert_receive %Socket.Message{ + topic: "realtime:test", + event: "broadcast", + payload: %{ + "event" => "first", + "meta" => %{"id" => ^message1_id, "replayed" => true}, + "payload" => %{"value" => "first"}, + "type" => "broadcast" + } + } + + assert_receive %Socket.Message{ + topic: "realtime:test", + event: "broadcast", + payload: %{ + "event" => "second", + "meta" => %{"id" => ^message2_id, "replayed" => true}, + "payload" => %{"value" => "second"}, + "type" => "broadcast" + } + } + + refute_receive %Socket.Message{} + end + end + describe "presence" do test "events are counted", %{tenant: tenant} do jwt = Generators.generate_jwt_token(tenant)