Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ defmodule Realtime.Tenants.Connect do
{:error, {:already_started, _}} ->
get_status(tenant_id)

{:error, :killed} ->
log_error("UnableToConnectToTenantDatabase", "Unable to connect to tenant database")
{:error, :tenant_database_unavailable}

{:error, error} ->
log_error("UnableToConnectToTenantDatabase", error)
{:error, :tenant_database_unavailable}
Expand Down Expand Up @@ -183,12 +187,12 @@ defmodule Realtime.Tenants.Connect do
else
error ->
log_error("MigrationsFailedToRun", error)
{:stop, :kill, state}
{:stop, :shutdown, state}
end
rescue
error ->
log_error("MigrationsFailedToRun", error)
{:stop, :kill, state}
{:stop, :shutdown, state}
end

def handle_continue(:start_listen_and_replication, state) do
Expand All @@ -201,12 +205,12 @@ defmodule Realtime.Tenants.Connect do
else
{:error, error} ->
log_error("StartListenAndReplicationFailed", error)
{:stop, :kill, state}
{:stop, :shutdown, state}
end
rescue
error ->
log_error("StartListenAndReplicationFailed", error)
{:stop, :kill, state}
{:stop, :shutdown, state}
end

@impl true
Expand Down
2 changes: 1 addition & 1 deletion lib/realtime/tenants/replication_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ defmodule Realtime.Tenants.ReplicationConnection do

@impl true
def handle_disconnect(state) do
Logger.warning("Disconnecting broadcast changes handler: #{inspect(state, pretty: true)}")
Logger.warning("Disconnecting broadcast changes handler in the step : #{inspect(state.step)}")
{:noreply, %{state | step: :disconnected}}
end

Expand Down
2 changes: 1 addition & 1 deletion lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ defmodule RealtimeWeb.RealtimeChannel do
metadata = log_metadata(access_token)
push_system_message("system", socket, "error", message, channel_name)
log_warning("ChannelShutdown", message, metadata)
{:stop, :shutdown, socket}
{:stop, :normal, socket}
end

defp push_system_message(extension, socket, status, error, channel_name)
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.34.12",
version: "2.34.13",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/.template.env
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
PROJECT_URL=
PROJECT_ANON_TOKEN=
PROJECT_JWT_SECRET
PROJECT_JWT_SECRET=
7 changes: 4 additions & 3 deletions test/integration/rt_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@

alias __MODULE__.Endpoint
alias Extensions.PostgresCdcRls, as: Rls
alias Phoenix.Socket.{V1, Message}
alias Phoenix.Socket.Message
alias Phoenix.Socket.V1
alias Postgrex, as: P
alias Realtime.Api.Tenant
alias Realtime.Database
Expand Down Expand Up @@ -119,7 +120,7 @@
ref: nil,
topic: ^topic
},
5000
8000

{:ok, _, conn} = Rls.get_manager_conn(@external_id)
P.query!(conn, "insert into test (details) values ('test')", [])
Expand Down Expand Up @@ -1114,7 +1115,7 @@
} do
change_tenant_configuration(:private_only, true)

Realtime.Tenants.Cache.invalidate_tenant_cache(@external_id)

Check warning on line 1118 in test/integration/rt_channel_test.exs

View workflow job for this annotation

GitHub Actions / Tests

Nested modules could be aliased at the top of the invoking module.

Process.sleep(100)

Expand Down Expand Up @@ -1332,7 +1333,7 @@
config = %{broadcast: %{self: true}, private: false}
realtime_topic = "realtime:#{random_string()}"

for _ <- 1..10 do
for _ <- 1..15 do
WebsocketClient.join(socket, realtime_topic, %{config: config})
end

Expand Down Expand Up @@ -1452,10 +1453,10 @@
end

def setup_trigger(%{tenant: tenant, topic: topic} = context) do
Realtime.Tenants.Connect.shutdown(@external_id)

Check warning on line 1456 in test/integration/rt_channel_test.exs

View workflow job for this annotation

GitHub Actions / Tests

Nested modules could be aliased at the top of the invoking module.
Process.sleep(500)

{:ok, db_conn} = Realtime.Tenants.Connect.connect(@external_id)

Check warning on line 1459 in test/integration/rt_channel_test.exs

View workflow job for this annotation

GitHub Actions / Tests

Nested modules could be aliased at the top of the invoking module.

random_name = String.downcase("test_#{random_string()}")
query = "CREATE TABLE #{random_name} (id serial primary key, details text)"
Expand Down Expand Up @@ -1492,7 +1493,7 @@
{:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop)
query = "DROP TABLE #{random_name} CASCADE"
Postgrex.query!(db_conn, query, [])
Realtime.Tenants.Connect.shutdown(db_conn)

Check warning on line 1496 in test/integration/rt_channel_test.exs

View workflow job for this annotation

GitHub Actions / Tests

Nested modules could be aliased at the top of the invoking module.

Process.sleep(500)
end)
Expand All @@ -1505,9 +1506,9 @@
defp change_tenant_configuration(limit, value) do
@external_id
|> Realtime.Tenants.get_tenant_by_external_id()
|> Realtime.Api.Tenant.changeset(%{limit => value})

Check warning on line 1509 in test/integration/rt_channel_test.exs

View workflow job for this annotation

GitHub Actions / Tests

Nested modules could be aliased at the top of the invoking module.
|> Realtime.Repo.update!()

Realtime.Tenants.Cache.invalidate_tenant_cache(@external_id)

Check warning on line 1512 in test/integration/rt_channel_test.exs

View workflow job for this annotation

GitHub Actions / Tests

Nested modules could be aliased at the top of the invoking module.
end
end
5 changes: 0 additions & 5 deletions test/realtime/monitoring/erl_sys_mon_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@ defmodule Realtime.ErlSysMonTest do
alias Realtime.ErlSysMon

describe "system monitoring" do
setup do
Logger.configure(level: :warning)
on_exit(fn -> Logger.configure(level: :error) end)
end

test "logs system monitor events" do
start_supervised!({ErlSysMon, [{:long_message_queue, {1, 10}}]})

Expand Down
3 changes: 0 additions & 3 deletions test/realtime/signal_handler_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ defmodule Realtime.SignalHandlerTest do
end

setup do
Logger.configure(level: :warning)

on_exit(fn ->
Logger.configure(level: :error)
Application.put_env(:realtime, :shutdown_in_progress, false)
end)
end
Expand Down
3 changes: 2 additions & 1 deletion test/realtime/telemetry/logger_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ defmodule Realtime.Telemetry.LoggerTest do
alias Realtime.Telemetry.Logger, as: TelemetryLogger

setup do
level = Logger.level()
Logger.configure(level: :info)
on_exit(fn -> Logger.configure(level: :error) end)
on_exit(fn -> Logger.configure(level: level) end)
end

describe "logger backend initialization" do
Expand Down
25 changes: 14 additions & 11 deletions test/realtime/tenants/connect_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ defmodule Realtime.Tenants.ConnectTest do
alias Realtime.UsersCounter

setup do
Cleanup.ensure_no_replication_slot()
tenant = tenant_fixture()
Cleanup.ensure_no_replication_slot()
%{tenant: tenant}
end

Expand All @@ -20,6 +20,7 @@ defmodule Realtime.Tenants.ConnectTest do
assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
Process.sleep(100)
assert is_pid(db_conn)
Connect.shutdown(tenant.external_id)
end

test "on database disconnect, returns new connection", %{tenant: tenant} do
Expand All @@ -33,6 +34,7 @@ defmodule Realtime.Tenants.ConnectTest do
on_exit(fn -> Process.exit(new_conn, :shutdown) end)

assert new_conn != old_conn
Connect.shutdown(tenant.external_id)
end

test "if tenant exists but unable to connect, returns error" do
Expand Down Expand Up @@ -71,13 +73,14 @@ defmodule Realtime.Tenants.ConnectTest do
Connect.lookup_or_start_connection(tenant_id, check_connected_user_interval: 100)

# Not enough time has passed, connection still alive
Process.sleep(500)
Process.sleep(400)
assert {_, %{conn: _}} = :syn.lookup(Connect, tenant_id)

# Enough time has passed, connection stopped
Process.sleep(1000)
assert :undefined = :syn.lookup(Connect, tenant_id)
refute Process.alive?(db_conn)
Connect.shutdown(tenant_id)
end

test "if users are connected to a tenant channel, keep the connection", %{
Expand All @@ -94,6 +97,8 @@ defmodule Realtime.Tenants.ConnectTest do
Process.sleep(300)
assert {^pid, %{conn: ^conn_pid}} = :syn.lookup(Connect, tenant_id)
assert Process.alive?(db_conn)

Connect.shutdown(tenant_id)
end

test "connection is killed after user leaving", %{
Expand All @@ -110,6 +115,7 @@ defmodule Realtime.Tenants.ConnectTest do
Process.sleep(1000)
assert :undefined = :syn.lookup(Connect, tenant_id)
refute Process.alive?(db_conn)
Connect.shutdown(tenant_id)
end

test "error if tenant is suspended" do
Expand All @@ -133,6 +139,7 @@ defmodule Realtime.Tenants.ConnectTest do
Realtime.Tenants.unsuspend_tenant_by_external_id(tenant.external_id)
Process.sleep(50)
assert {:ok, _} = Connect.lookup_or_start_connection(tenant.external_id)
Connect.shutdown(tenant.external_id)
end

test "properly handles of failing calls by avoid creating too many connections" do
Expand Down Expand Up @@ -178,13 +185,12 @@ defmodule Realtime.Tenants.ConnectTest do
end

test "starts broadcast handler and does not fail on existing connection", %{tenant: tenant} do
on_exit(fn -> Connect.shutdown(tenant.external_id) end)

assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
Process.sleep(3000)

replication_connection_before = ReplicationConnection.whereis(tenant.external_id)
listen_before = Listen.whereis(tenant.external_id)

assert Process.alive?(replication_connection_before)
assert Process.alive?(listen_before)

Expand All @@ -200,9 +206,8 @@ defmodule Realtime.Tenants.ConnectTest do
end

test "failed broadcast handler and listen recover from failure", %{tenant: tenant} do
on_exit(fn -> Connect.shutdown(tenant.external_id) end)
assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
Process.sleep(3000)
Process.sleep(1000)

replication_connection_pid = ReplicationConnection.whereis(tenant.external_id)
listen_pid = ReplicationConnection.whereis(tenant.external_id)
Expand All @@ -224,10 +229,9 @@ defmodule Realtime.Tenants.ConnectTest do
end

test "on database disconnect, connection is killed to all components", %{tenant: tenant} do
on_exit(fn -> Connect.shutdown(tenant.external_id) end)
assert {:ok, _db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
old_pid = Connect.whereis(tenant.external_id)
Process.sleep(3000)
Process.sleep(1000)

old_replication_connection_pid = ReplicationConnection.whereis(tenant.external_id)
old_listen_connection_pid = Listen.whereis(tenant.external_id)
Expand All @@ -252,15 +256,14 @@ defmodule Realtime.Tenants.ConnectTest do
describe "shutdown/1" do
test "shutdowns all associated connections", %{tenant: tenant} do
assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
Process.sleep(1000)

assert Process.alive?(db_conn)
Process.sleep(300)
assert Process.alive?(Connect.whereis(tenant.external_id))
assert Process.alive?(ReplicationConnection.whereis(tenant.external_id))
assert Process.alive?(Listen.whereis(tenant.external_id))

Connect.shutdown(tenant.external_id)
Process.sleep(1000)
Process.sleep(200)
refute Connect.whereis(tenant.external_id)
refute ReplicationConnection.whereis(tenant.external_id)
refute Listen.whereis(tenant.external_id)
Expand Down
3 changes: 2 additions & 1 deletion test/realtime_web/channels/realtime_channel/logging_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ defmodule RealtimeWeb.RealtimeChannel.LoggingTest do
alias RealtimeWeb.RealtimeChannel.Logging

setup do
level = Logger.level()
Logger.configure(level: :debug)
on_exit(fn -> Logger.configure(level: :error) end)
on_exit(fn -> Logger.configure(level: level) end)
end

describe "maybe_log_handle_info/2" do
Expand Down
28 changes: 21 additions & 7 deletions test/realtime_web/channels/realtime_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule RealtimeWeb.RealtimeChannelTest do
authorize_conn: fn _, _, _ ->
{:ok, %{"exp" => Joken.current_time() + 1_000, "role" => "postgres"}}
end do
{:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts())
{:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts())

socket = Socket.assign(socket, %{limits: %{@default_limits | max_concurrent_users: 1}})
assert {:ok, _, %Socket{}} = subscribe_and_join(socket, "realtime:test", %{})
Expand All @@ -43,7 +43,7 @@ defmodule RealtimeWeb.RealtimeChannelTest do
authorize_conn: fn _, _, _ ->
{:ok, %{"exp" => Joken.current_time() + 1_000, "role" => "postgres"}}
end do
{:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts())
{:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts())

socket_at_capacity =
Socket.assign(socket, %{limits: %{@default_limits | max_concurrent_users: 0}})
Expand All @@ -66,7 +66,7 @@ defmodule RealtimeWeb.RealtimeChannelTest do
authorize_conn: fn _, _, _ ->
{:ok, %{"exp" => Joken.current_time() + 1, "role" => "postgres"}}
end do
{:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts())
{:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts())

assert {:ok, _, %Socket{}} = subscribe_and_join(socket, "realtime:test", %{})
end
Expand All @@ -77,23 +77,27 @@ defmodule RealtimeWeb.RealtimeChannelTest do
authorize_conn: fn _, _, _ ->
{:ok, %{"exp" => Joken.current_time(), "role" => "postgres"}}
end do
{:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts())
{:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts())

assert capture_log(fn ->
assert {:error, %{reason: "Token expiration time is invalid"}} =
subscribe_and_join(socket, "realtime:test", %{})

Process.sleep(300)
end) =~ "InvalidJWTExpiration: Token expiration time is invalid"
end

with_mock ChannelsAuthorization, [],
authorize_conn: fn _, _, _ ->
{:ok, %{"exp" => Joken.current_time() - 1, "role" => "postgres"}}
end do
{:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts())
{:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts())

assert capture_log(fn ->
assert {:error, %{reason: "Token expiration time is invalid"}} =
subscribe_and_join(socket, "realtime:test", %{})

Process.sleep(300)
end) =~ "InvalidJWTExpiration: Token expiration time is invalid"
end
end
Expand All @@ -105,6 +109,8 @@ defmodule RealtimeWeb.RealtimeChannelTest do
capture_log(fn ->
assert {:error, :missing_claims} =
connect(UserSocket, %{"log_level" => "warning"}, conn_opts())

Process.sleep(300)
end)

assert log =~ "InvalidJWTToken: Fields `role` and `exp` are required in JWT"
Expand All @@ -121,6 +127,8 @@ defmodule RealtimeWeb.RealtimeChannelTest do
capture_log(fn ->
assert {:error, :missing_claims} =
connect(UserSocket, %{"log_level" => "warning"}, conn_opts)

Process.sleep(300)
end)

assert log =~ "InvalidJWTToken: Fields `role` and `exp` are required in JWT"
Expand All @@ -140,6 +148,8 @@ defmodule RealtimeWeb.RealtimeChannelTest do
capture_log(fn ->
assert {:error, :expired_token} =
connect(UserSocket, %{"log_level" => "warning"}, conn_opts)

Process.sleep(300)
end)

assert log =~ "InvalidJWTToken: Token as expired 1000 seconds ago"
Expand All @@ -156,6 +166,8 @@ defmodule RealtimeWeb.RealtimeChannelTest do
capture_log(fn ->
assert {:error, :expired_token} =
connect(UserSocket, %{"log_level" => "warning"}, conn_opts())

Process.sleep(300)
end)

assert log =~ "InvalidJWTToken: Token as expired 1000 seconds ago"
Expand All @@ -174,7 +186,7 @@ defmodule RealtimeWeb.RealtimeChannelTest do
end

test "successful connection proceeds with join" do
{:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts())
{:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts())
assert {:ok, _, %Socket{}} = subscribe_and_join(socket, "realtime:test", %{})
end

Expand All @@ -198,7 +210,9 @@ defmodule RealtimeWeb.RealtimeChannelTest do
]

tenant = tenant_fixture(%{extensions: extensions})
{:ok, %Socket{} = socket} = connect(UserSocket, %{}, conn_opts(tenant.external_id))

{:ok, %Socket{} = socket} =
connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant.external_id))

assert {:error, %{reason: "Realtime was unable to connect to the project database"}} =
subscribe_and_join(socket, "realtime:test", %{})
Expand Down
Loading
Loading