Skip to content

Commit

Permalink
fix: Remove policies temporarly to avoid stopping service (#847)
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco committed Apr 13, 2024
1 parent 9bf0fbb commit 815a520
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 341 deletions.
110 changes: 10 additions & 100 deletions lib/realtime_web/controllers/broadcast_controller.ex
Original file line number Diff line number Diff line change
@@ -1,28 +1,18 @@
defmodule RealtimeWeb.BroadcastController do
use RealtimeWeb, :controller
use OpenApiSpex.ControllerSpecs
require Logger
import Ecto.Query

alias Realtime.Api.Channel
alias Realtime.Api.Tenant
alias Realtime.GenCounter
alias Realtime.Helpers
alias Realtime.RateCounter
alias Realtime.Repo
alias Realtime.Tenants
alias Realtime.Tenants.Authorization
alias Realtime.Tenants.Authorization.Policies
alias Realtime.Tenants.Authorization.Policies.BroadcastPolicies
alias Realtime.Tenants.Authorization.Policies.ChannelPolicies
alias Realtime.Tenants.BatchBroadcast
alias Realtime.Tenants.Connect

alias RealtimeWeb.Endpoint

alias RealtimeWeb.OpenApiSchemas.EmptyResponse
alias RealtimeWeb.OpenApiSchemas.TenantBatchParams
alias RealtimeWeb.OpenApiSchemas.TooManyRequestsResponse
alias RealtimeWeb.OpenApiSchemas.UnprocessableEntityResponse
alias RealtimeWeb.OpenApiSchemas.TooManyRequestsResponse

action_fallback(RealtimeWeb.FallbackController)

Expand Down Expand Up @@ -50,102 +40,22 @@ defmodule RealtimeWeb.BroadcastController do
def broadcast(%{assigns: %{tenant: tenant}} = conn, attrs) do
with %Ecto.Changeset{valid?: true} = changeset <-
BatchBroadcast.changeset(%BatchBroadcast{}, attrs),
%Ecto.Changeset{changes: %{messages: messages}} = changeset,
events_per_second_key = Tenants.events_per_second_key(tenant),
:ok <- check_rate_limit(events_per_second_key, tenant, length(messages)),
{:ok, db_conn} <- Connect.lookup_or_start_connection(tenant.external_id) do
events =
Enum.map(messages, fn %{changes: %{topic: sub_topic} = event} -> {sub_topic, event} end)
%Ecto.Changeset{changes: %{messages: messages}} <- changeset,
events_per_second_key <- Tenants.events_per_second_key(tenant),
:ok <- check_rate_limit(events_per_second_key, tenant, length(messages)) do
for %{changes: %{topic: sub_topic, payload: payload, event: event}} <- messages do
tenant_topic = Tenants.tenant_topic(tenant, sub_topic)
payload = %{"payload" => payload, "event" => event, "type" => "broadcast"}

channel_names = events |> Enum.map(fn {sub_topic, _} -> sub_topic end) |> MapSet.new()
Endpoint.broadcast_from(self(), tenant_topic, "broadcast", payload)

if MapSet.size(channel_names) > 1 do
Logger.warning(
"This Broadcast is sending to multiple channels. Avoid this as it impact your performance."
)
GenCounter.add(events_per_second_key)
end

query_to_check = from(c in Channel, where: c.name in ^MapSet.to_list(channel_names))

channels =
Helpers.transaction(db_conn, fn transaction_conn ->
transaction_conn
|> Repo.all(query_to_check, Channel)
|> then(fn {:ok, channels} -> channels end)
end)

channels_names_to_check =
channels
|> Enum.map(& &1.name)
|> MapSet.new()

# Handle events without authorization
MapSet.difference(channel_names, channels_names_to_check)
|> Enum.each(fn channel_name ->
events
|> Enum.filter(fn {sub_topic, _} -> sub_topic == channel_name end)
|> Enum.each(fn {_, %{topic: sub_topic, payload: payload, event: event}} ->
send_message_and_count(tenant, sub_topic, event, payload, true)
end)
end)

# Handle events with authorization
channels_names_to_check
|> Enum.reduce([], fn sub_topic, acc ->
Enum.filter(events, fn
{^sub_topic, _} -> true
_ -> false
end) ++ acc
end)
|> Enum.map(fn {_, event} -> event end)
|> Enum.each(fn %{topic: channel_name, payload: payload, event: event} ->
Helpers.transaction(db_conn, fn transaction_conn ->
case permissions_for_channel(conn, transaction_conn, channels, channel_name) do
%Policies{
channel: %ChannelPolicies{read: true},
broadcast: %BroadcastPolicies{write: true}
} ->
send_message_and_count(tenant, channel_name, event, payload, false)

_ ->
nil
end
end)
end)

send_resp(conn, :accepted, "")
end
end

defp send_message_and_count(tenant, channel_name, event, payload, public?) do
events_per_second_key = Tenants.events_per_second_key(tenant)
tenant_topic = Tenants.tenant_topic(tenant, channel_name, public?)
payload = %{"payload" => payload, "event" => event, "type" => "broadcast"}

GenCounter.add(events_per_second_key)
Endpoint.broadcast_from(self(), tenant_topic, "broadcast", payload)
end

defp permissions_for_channel(conn, db_conn, channels, channel_name) do
params = %{
headers: conn.req_headers,
jwt: conn.assigns.jwt,
claims: conn.assigns.claims,
role: conn.assigns.role
}

with channel <- Enum.find(channels, &(&1.name == channel_name)),
params = Map.put(params, :channel, channel),
params = Map.put(params, :channel_name, channel.name),
params = Authorization.build_authorization_params(params),
%Policies{} = policies <- Authorization.get_authorizations(db_conn, params) do
policies
else
{:error, :not_found} -> nil
error -> error
end
end

defp check_rate_limit(events_per_second_key, %Tenant{} = tenant, total_messages_to_broadcast) do
%{max_events_per_second: max_events_per_second} = tenant
{:ok, %{avg: events_per_second}} = RateCounter.get(events_per_second_key)
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.28.20",
version: "2.28.21",
elixir: "~> 1.14.0",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down

0 comments on commit 815a520

Please sign in to comment.