Skip to content

Commit

Permalink
fix: Add flag to enable / disable Authorization
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco committed Apr 18, 2024
1 parent b4d8561 commit 0e40c41
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 36 deletions.
4 changes: 3 additions & 1 deletion lib/realtime/api/tenant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ defmodule Realtime.Api.Tenant do
field(:suspend, :boolean, default: false)
field(:events_per_second_rolling, :float, virtual: true)
field(:events_per_second_now, :integer, virtual: true)
field(:enable_authorization, :boolean, default: false)

has_many(:extensions, Realtime.Api.Extensions,
foreign_key: :tenant_external_id,
Expand Down Expand Up @@ -73,7 +74,8 @@ defmodule Realtime.Api.Tenant do
:max_bytes_per_second,
:max_channels_per_client,
:max_joins_per_second,
:suspend
:suspend,
:enable_authorization
])
|> validate_required([
:external_id,
Expand Down
35 changes: 23 additions & 12 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule RealtimeWeb.RealtimeChannel do
alias DBConnection.Backoff

alias Realtime.Channels.Cache, as: ChannelsCache
alias Realtime.Api.Tenant
alias Realtime.GenCounter
alias Realtime.Helpers
alias Realtime.PostgresCdc
Expand All @@ -19,6 +20,7 @@ defmodule RealtimeWeb.RealtimeChannel do
alias Realtime.Tenants.Authorization.Policies.BroadcastPolicies
alias Realtime.Tenants.Authorization.Policies.ChannelPolicies
alias Realtime.Tenants.Authorization.Policies.PresencePolicies
alias Realtime.Tenants.Cache, as: TenantCache
alias Realtime.Tenants.Connect

alias RealtimeWeb.ChannelsAuthorization
Expand All @@ -31,13 +33,13 @@ defmodule RealtimeWeb.RealtimeChannel do
@impl true
def join("realtime:" <> sub_topic = topic, params, socket) do
%{
assigns: %{tenant: tenant, log_level: log_level, postgres_cdc_module: module},
assigns: %{tenant: tenant_id, log_level: log_level, postgres_cdc_module: module},
channel_pid: channel_pid,
serializer: serializer,
transport_pid: transport_pid
} = socket

Logger.metadata(external_id: tenant, project: tenant)
Logger.metadata(external_id: tenant_id, project: tenant_id)
Logger.put_process_level(self(), log_level)

socket =
Expand All @@ -46,21 +48,22 @@ defmodule RealtimeWeb.RealtimeChannel do
|> assign_counter()
|> assign(:using_broadcast?, !!params["config"]["broadcast"])

start_db_rate_counter(tenant)
start_db_rate_counter(tenant_id)

with false <- SignalHandler.shutdown_in_progress?(),
:ok <- limit_joins(socket.assigns),
:ok <- limit_channels(socket),
:ok <- limit_max_users(socket.assigns),
{:ok, claims, confirm_token_ref, access_token, _} <- confirm_token(socket),
{:ok, db_conn} <- Connect.lookup_or_start_connection(tenant),
{:ok, db_conn} <- Connect.lookup_or_start_connection(tenant_id),
tenant = TenantCache.get_tenant_by_external_id(tenant_id),
channel = ChannelsCache.get_channel_by_name(sub_topic, db_conn),
{:ok, socket} <- assign_policies(channel, db_conn, access_token, claims, socket) do
{:ok, socket} <- assign_policies(tenant, channel, db_conn, access_token, claims, socket) do
is_new_api = is_new_api(params)
public? = !match?({:ok, _}, channel)
tenant_topic = Tenants.tenant_topic(tenant, sub_topic, public?)
tenant_topic = Tenants.tenant_topic(tenant_id, sub_topic, public?)

Realtime.UsersCounter.add(transport_pid, tenant)
Realtime.UsersCounter.add(transport_pid, tenant_id)
RealtimeWeb.Endpoint.subscribe(tenant_topic)

pg_change_params = pg_change_params(is_new_api, params, channel_pid, claims, sub_topic)
Expand All @@ -71,7 +74,7 @@ defmodule RealtimeWeb.RealtimeChannel do
transport_pid: transport_pid,
serializer: serializer,
topic: topic,
tenant: tenant,
tenant: tenant_id,
module: module
}

Expand Down Expand Up @@ -497,11 +500,12 @@ defmodule RealtimeWeb.RealtimeChannel do
%{
access_token: access_token,
db_conn: db_conn,
channel_name: channel_name
channel_name: channel_name,
tenant: tenant
} = assigns

with channel = ChannelsCache.get_channel_by_name(channel_name, db_conn),
{:ok, socket} <- assign_policies(channel, db_conn, access_token, claims, socket) do
{:ok, socket} <- assign_policies(tenant, channel, db_conn, access_token, claims, socket) do
{:ok, socket}
end
end
Expand Down Expand Up @@ -615,7 +619,14 @@ defmodule RealtimeWeb.RealtimeChannel do
end)
end

defp assign_policies({:ok, channel}, db_conn, access_token, claims, socket) do
defp assign_policies(
%Tenant{enable_authorization: true},
{:ok, channel},
db_conn,
access_token,
claims,
socket
) do
%{using_broadcast?: using_broadcast?} = socket.assigns

authorization_context =
Expand Down Expand Up @@ -645,7 +656,7 @@ defmodule RealtimeWeb.RealtimeChannel do
end
end

defp assign_policies(_, _, _, _, socket) do
defp assign_policies(_, _, _, _, _, socket) do
{:ok, assign(socket, policies: nil)}
end
end
46 changes: 24 additions & 22 deletions lib/realtime_web/controllers/broadcast_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -89,29 +89,31 @@ defmodule RealtimeWeb.BroadcastController do
end)
end)

# Handle events with authorization
channels_names_to_check
|> Enum.reduce([], fn sub_topic, acc ->
Enum.filter(events, fn
{^sub_topic, _} -> true
_ -> false
end) ++ acc
end)
|> Enum.map(fn {_, event} -> event end)
|> Enum.each(fn %{topic: channel_name, payload: payload, event: event} ->
Helpers.transaction(db_conn, fn transaction_conn ->
case permissions_for_channel(conn, transaction_conn, channels, channel_name) do
%Policies{
channel: %ChannelPolicies{read: true},
broadcast: %BroadcastPolicies{write: true}
} ->
send_message_and_count(tenant, channel_name, event, payload, false)

_ ->
nil
end
if(tenant.enable_authorization) do

Check warning on line 92 in lib/realtime_web/controllers/broadcast_controller.ex

View workflow job for this annotation

GitHub Actions / Formatting Checks

The condition of `if` should not be wrapped in parentheses.
# Handle events with authorization
channels_names_to_check
|> Enum.reduce([], fn sub_topic, acc ->
Enum.filter(events, fn
{^sub_topic, _} -> true
_ -> false
end) ++ acc
end)
end)
|> Enum.map(fn {_, event} -> event end)
|> Enum.each(fn %{topic: channel_name, payload: payload, event: event} ->
Helpers.transaction(db_conn, fn transaction_conn ->
case permissions_for_channel(conn, transaction_conn, channels, channel_name) do
%Policies{
channel: %ChannelPolicies{read: true},
broadcast: %BroadcastPolicies{write: true}
} ->
send_message_and_count(tenant, channel_name, event, payload, false)

_ ->
nil
end
end)
end)
end

send_resp(conn, :accepted, "")
end
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.28.25",
version: "2.28.26",
elixir: "~> 1.14.0",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
defmodule Realtime.Repo.Migrations.AddAuthorizationFlag do
use Ecto.Migration

def change do
alter table(:tenants) do
add :enable_authorization, :boolean, default: false
end
end
end
1 change: 1 addition & 0 deletions priv/repo/seeds_after_migration.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ end

%{
"name" => tenant_name,
"enable_authorization" => true,
"extensions" => [
%{
"type" => "postgres_cdc_rls",
Expand Down
1 change: 1 addition & 0 deletions test/support/generators.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule Generators do
def tenant_fixture(override \\ %{}) do
create_attrs = %{
"external_id" => random_string(),
"enable_authorization" => true,
"name" => "localhost",
"extensions" => [
%{
Expand Down

0 comments on commit 0e40c41

Please sign in to comment.