Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions lib/realtime/encryption.ex → lib/realtime/crypto.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,6 @@ defmodule Realtime.Crypto do
|> unpad()
end

@doc "
Decrypts the given credentials
"
@spec decrypt_creds(binary(), binary(), binary(), binary(), binary()) ::
{binary(), binary(), binary(), binary(), binary()}
def decrypt_creds(host, port, name, user, pass) do
{
decrypt!(host),
decrypt!(port),
decrypt!(name),
decrypt!(user),
decrypt!(pass)
}
end

defp pad(data) do
to_add = 16 - rem(byte_size(data), 16)
data <> :binary.copy(<<to_add>>, to_add)
Expand Down
47 changes: 39 additions & 8 deletions lib/realtime/database.ex
Original file line number Diff line number Diff line change
Expand Up @@ -85,25 +85,38 @@ defmodule Realtime.Database do
}
end

@available_connection_factor 0.95

defguardp can_connect?(available_connections, required_pool)
when required_pool * @available_connection_factor < available_connections

@doc """
Checks if the Tenant CDC extension information is properly configured and that we're able to query against the tenant database.
"""
@spec check_tenant_connection(Tenant.t() | nil, binary()) :: {:error, atom()} | {:ok, pid()}
def check_tenant_connection(tenant, application_name)
def check_tenant_connection(nil, _), do: {:error, :tenant_not_found}

def check_tenant_connection(tenant, application_name) do
@spec check_tenant_connection(Tenant.t() | nil) :: {:error, atom()} | {:ok, pid()}
def check_tenant_connection(nil), do: {:error, :tenant_not_found}

def check_tenant_connection(tenant) do
tenant
|> then(&PostgresCdc.filter_settings(@cdc, &1.extensions))
|> then(fn settings ->
check_settings = from_settings(settings, application_name, :stop)
required_pool = tenant_pool_requirements(settings)
check_settings = from_settings(settings, "realtime_connect", :stop)
check_settings = Map.put(check_settings, :max_restarts, 0)

with {:ok, conn} <- connect_db(check_settings) do
case Postgrex.query(conn, "SELECT 1", []) do
{:ok, _} ->
query =
"select (current_setting('max_connections')::int - count(*))::int from pg_stat_activity"

case Postgrex.query(conn, query, []) do
{:ok, %{rows: [[available_connections]]}}
when can_connect?(available_connections, required_pool) ->
{:ok, conn}

{:ok, _} ->
{:error, :tenant_db_too_many_connections}

{:error, e} ->
Process.exit(conn, :kill)
log_error("UnableToConnectToTenantDatabase", e)
Expand Down Expand Up @@ -239,7 +252,6 @@ defmodule Realtime.Database do
@spec get_external_id(String.t()) :: {:ok, String.t()} | {:error, atom()}
def get_external_id(host) when is_binary(host) do
case String.split(host, ".", parts: 2) do
[] -> {:error, :tenant_not_found_in_host}
[id] -> {:ok, id}
[id, _] -> {:ok, id}
end
Expand Down Expand Up @@ -299,4 +311,23 @@ defmodule Realtime.Database do
Postgrex.query(conn, "select pg_drop_replication_slot($1)", [slot_name])
:ok
end

defp tenant_pool_requirements(settings) do
application_names = [
"realtime_subscription_manager",
"realtime_subscription_manager_pub",
"realtime_subscription_checker",
"realtime_connect",
"realtime_health_check",
"realtime_janitor",
"realtime_migrations",
"realtime_broadcast_changes",
"realtime_rls",
"realtime_replication_slot_teardown"
]

Enum.reduce(application_names, 0, fn application_name, acc ->
acc + pool_size_by_application_name(application_name, settings)
end)
end
end
20 changes: 14 additions & 6 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ defmodule Realtime.Tenants.Connect do
| :tenant_database_connection_initializing}
def get_status(tenant_id) do
case :syn.lookup(__MODULE__, tenant_id) do
{_, %{conn: conn}} when not is_nil(conn) ->
{:ok, conn}

{_, %{conn: nil}} ->
{:error, :initializing}

{_, %{conn: conn}} ->
{:ok, conn}

:undefined ->
Logger.warning("Connection process starting up")
{:error, :tenant_database_connection_initializing}
Expand Down Expand Up @@ -109,7 +109,13 @@ defmodule Realtime.Tenants.Connect do
{:error, {:already_started, _}} ->
get_status(tenant_id)

{:error, :killed} ->
{:error, {:shutdown, :tenant_db_too_many_connections}} ->
{:error, :tenant_db_too_many_connections}

{:error, {:shutdown, :tenant_not_found}} ->
{:error, :tenant_not_found}

{:error, :shutdown} ->
log_error("UnableToConnectToTenantDatabase", "Unable to connect to tenant database")
{:error, :tenant_database_unavailable}

Expand Down Expand Up @@ -169,8 +175,10 @@ defmodule Realtime.Tenants.Connect do
{:ok, acc, {:continue, :run_migrations}}
else
{:error, :tenant_not_found} ->
log_error("TenantNotFound", "Tenant not found")
{:stop, :shutdown}
{:stop, {:shutdown, :tenant_not_found}}

{:error, :tenant_db_too_many_connections} ->
{:stop, {:shutdown, :tenant_db_too_many_connections}}

{:error, error} ->
log_error("UnableToConnectToTenantDatabase", error)
Expand Down
3 changes: 1 addition & 2 deletions lib/realtime/tenants/connect/check_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ defmodule Realtime.Tenants.Connect.CheckConnection do
"""
alias Realtime.Database

@application_name "realtime_connect"
@behaviour Realtime.Tenants.Connect.Piper
@impl true
def run(acc) do
%{tenant: tenant} = acc

case Database.check_tenant_connection(tenant, @application_name) do
case Database.check_tenant_connection(tenant) do
{:ok, conn} ->
{:ok, %{acc | db_conn_pid: conn, db_conn_reference: Process.monitor(conn)}}

Expand Down
4 changes: 4 additions & 0 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ defmodule RealtimeWeb.RealtimeChannel do
msg = "Please increase your connection pool size"
Logging.log_error_message(:warning, "IncreaseConnectionPool", msg)

{:error, :tenant_db_too_many_connections} ->
msg = "Database can't accept more connections, Realtime won't connect"
Logging.log_error_message(:warning, "DatabaseLackOfConnections", msg)

{:error, :unable_to_set_policies, error} ->
Logging.log_error_message(:warning, "UnableToSetPolicies", error)

Expand Down
1 change: 0 additions & 1 deletion lib/realtime_web/plugs/assign_tenant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ defmodule RealtimeWeb.Plugs.AssignTenant do

assign(conn, :tenant, tenant)
else
{:error, :tenant_not_found_in_host} -> error_response(conn, "Tenant not found in host")
nil -> error_response(conn, "Tenant not found in database")
end
end
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.34.15",
version: "2.34.17",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
16 changes: 16 additions & 0 deletions test/realtime/adapters/postgres/protocol_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
defmodule Realtime.Adapters.Postgres.ProtocolTest do
use ExUnit.Case
alias Realtime.Adapters.Postgres.Protocol

test "defguard is_write/1" do
require Protocol
assert Protocol.is_write("w")
refute Protocol.is_write("k")
end

test "defguard is_keep_alive/1" do
require Protocol
assert Protocol.is_keep_alive("k")
refute Protocol.is_keep_alive("w")
end
end
42 changes: 42 additions & 0 deletions test/realtime/database_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,48 @@ defmodule Realtime.DatabaseTest do
%{tenant: tenant}
end

describe "check_tenant_connection/1" do
setup context do
extensions = [
%{
"type" => "postgres_cdc_rls",
"settings" => %{
"db_host" => "localhost",
"db_name" => "postgres",
"db_user" => "supabase_admin",
"db_password" => "postgres",
"db_port" => "5433",
"poll_interval" => 100,
"poll_max_changes" => 100,
"poll_max_record_bytes" => 1_048_576,
"region" => "us-east-1",
"ssl_enforced" => false,
"db_pool" => Map.get(context, :db_pool),
"subcriber_pool_size" => Map.get(context, :subcriber_pool),
"subs_pool_size" => Map.get(context, :db_pool)
}
}
]

tenant = tenant_fixture(%{extensions: extensions})

%{tenant: tenant}
end

test "connects to a tenant database", %{tenant: tenant} do
assert {:ok, _} = Database.check_tenant_connection(tenant)
end

# Connection limit for docker tenant db is 100
@tag db_pool: 50,
subs_pool_size: 50,
subcriber_pool_size: 50
test "restricts connection if tenant database cannot receive more connections based on tenant pool",
%{tenant: tenant} do
assert {:error, :tenant_db_too_many_connections} = Database.check_tenant_connection(tenant)
end
end

describe "replication_slot_teardown/1" do
test "removes replication slots with the realtime prefix", %{tenant: tenant} do
{:ok, conn} = Database.connect(tenant, "realtime_test", :stop)
Expand Down
34 changes: 4 additions & 30 deletions test/realtime/repo_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ defmodule Realtime.RepoTest do
import Ecto.Query

alias Realtime.Api.Message
alias Realtime.Crypto
alias Realtime.Repo
alias Realtime.Database
alias Realtime.Tenants.Migrations
Expand Down Expand Up @@ -328,34 +327,9 @@ defmodule Realtime.RepoTest do
end

defp db_config() do
tenant = tenant_fixture()

%{
"db_host" => db_host,
"db_name" => db_name,
"db_password" => db_password,
"db_port" => db_port,
"db_user" => db_user
} = args = tenant.extensions |> hd() |> then(& &1.settings)

{host, port, name, user, pass} =
Crypto.decrypt_creds(
db_host,
db_port,
db_name,
db_user,
db_password
)

ssl_enforced = Database.default_ssl_param(args)

[
hostname: host,
port: port,
database: name,
password: pass,
username: user,
ssl_enforced: ssl_enforced
]
tenant_fixture()
|> Realtime.Database.from_tenant("realtime_test")
|> Map.to_list()
|> Keyword.new()
end
end
38 changes: 38 additions & 0 deletions test/realtime/tenants/connect_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,16 @@ defmodule Realtime.Tenants.ConnectTest do
assert ReplicationConnection.whereis(tenant.external_id) == nil
assert Listen.whereis(tenant.external_id) == nil
end

test "syn with no connection", %{tenant: tenant} do
with_mock :syn, [], lookup: fn _, _ -> {nil, %{conn: nil}} end do
assert {:error, :tenant_database_unavailable} =
Connect.lookup_or_start_connection(tenant.external_id)

assert {:error, :initializing} =
Connect.get_status(tenant.external_id)
end
end
end

describe "shutdown/1" do
Expand All @@ -272,6 +282,34 @@ defmodule Realtime.Tenants.ConnectTest do
test "if tenant does not exist, does nothing" do
assert :ok = Connect.shutdown("none")
end

test "tenant not able to connect if database has not enough connections" do
extensions = [
%{
"type" => "postgres_cdc_rls",
"settings" => %{
"db_host" => "localhost",
"db_name" => "postgres",
"db_user" => "supabase_admin",
"db_password" => "postgres",
"db_port" => "5433",
"poll_interval" => 100,
"poll_max_changes" => 100,
"poll_max_record_bytes" => 1_048_576,
"region" => "us-east-1",
"ssl_enforced" => false,
"db_pool" => 100,
"subcriber_pool_size" => 100,
"subs_pool_size" => 100
}
}
]

tenant = tenant_fixture(%{extensions: extensions})

assert {:error, :tenant_db_too_many_connections} =
Connect.lookup_or_start_connection(tenant.external_id)
end
end

defp check_db_connections_created(test_pid, tenant_id) do
Expand Down
32 changes: 31 additions & 1 deletion test/realtime_web/channels/realtime_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ defmodule RealtimeWeb.RealtimeChannelTest do
import ExUnit.CaptureLog

alias Phoenix.Socket
alias Realtime.Tenants.Authorization
alias RealtimeWeb.ChannelsAuthorization
alias RealtimeWeb.Joken.CurrentTime
alias RealtimeWeb.UserSocket
Expand Down Expand Up @@ -218,6 +217,37 @@ defmodule RealtimeWeb.RealtimeChannelTest do
assert {:error, %{reason: "Realtime was unable to connect to the project database"}} =
subscribe_and_join(socket, "realtime:test", %{})
end

test "lack of connections halts join" do
extensions = [
%{
"type" => "postgres_cdc_rls",
"settings" => %{
"db_host" => "localhost",
"db_name" => "postgres",
"db_user" => "supabase_admin",
"db_password" => "postgres",
"db_port" => "5433",
"poll_interval" => 100,
"poll_max_changes" => 100,
"poll_max_record_bytes" => 1_048_576,
"region" => "us-east-1",
"ssl_enforced" => false,
"db_pool" => 100,
"subcriber_pool_size" => 100,
"subs_pool_size" => 100
}
}
]

tenant = tenant_fixture(%{extensions: extensions})

{:ok, %Socket{} = socket} =
connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant.external_id))

assert {:error, %{reason: "Database can't accept more connections, Realtime won't connect"}} =
subscribe_and_join(socket, "realtime:test", %{})
end
end

defp conn_opts(tenant_id \\ @tenant_external_id, claims \\ %{}) do
Expand Down
Loading