Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,13 @@ This is the list of operational codes that can help you understand your deployme
| UnableToSetPolicies | Error when setting up Authorization Policies |
| UnableCheckoutConnection | Error when trying to checkout a connection from the tenant pool |
| UnableToSubscribeToPostgres | Error when trying to subscribe to Postgres changes |
| ReconnectSubscribeToPostgres | Postgres changes still waiting to be subscribed |
| ChannelRateLimitReached | The number of channels you can create has reached its limit |
| ConnectionRateLimitReached | The number of connected clients as reached its limit |
| ClientJoinRateLimitReached | The rate of joins per second from your clients as reached the channel limits |
| RealtimeDisabledForTenant | Realtime has been disabled for the tenant |
| UnableToConnectToTenantDatabase | Realtime was not able to connect to the tenant's database |
| DatabaseLackOfConnections | Realtime was not able to connect to the tenant's database due to not having enough available connections |
| TooManyConnectAttempts | Realtime restricted the amount of attempts when connecting to the tenants database |
| RealtimeNodeDisconnected | Realtime is a distributed application and this means that one the system is unable to communicate with one of the distributed nodes |
| MigrationsFailedToRun | Error when running the migrations against the Tenant database that are required by Realtime |
| StartListenAndReplicationFailed | Error when starting the replication and listening of errors for database broadcasting |
Expand All @@ -246,7 +246,6 @@ This is the list of operational codes that can help you understand your deployme
| UnableToUpdateCounter | Error when trying to update a counter to track rate limits for a tenant |
| UnableToFindCounter | Error when trying to find a counter to track rate limits for a tenant |
| UnhandledProcessMessage | Unhandled message received by a Realtime process |
| UnableToSetPolicies | We were not able to set policies for this connection |
| UnableToTrackPresence | Error when handling track presence for this socket |
| UnknownPresenceEvent | Presence event type not recognized by service |
| IncreaseConnectionPool | The number of connections you have set for Realtime are not enough to handle your current use case |
Expand All @@ -256,7 +255,7 @@ This is the list of operational codes that can help you understand your deployme
| UnableToConnectToProject | Unable to connect to Project database |
| InvalidJWTExpiration | JWT exp claim value it's incorrect |
| JwtSignatureError | JWT signature was not able to be validated |
| MalformedJWT | Token received does not comply with the JWT format |
| MalformedJWT | Token received does not comply with the JWT format |
| Unauthorized | Unauthorized access to Realtime channel |
| RealtimeRestarting | Realtime is currently restarting |
| UnableToProcessListenPayload | Payload sent in NOTIFY operation was JSON parsable |
Expand Down
3 changes: 0 additions & 3 deletions lib/extensions/postgres_cdc_rls/subscriptions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,12 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
msg =
"Unable to subscribe to changes with given parameters. Please check Realtime is enabled for the given connect parameters: [#{params_to_log(params)}]"

log_warning("RealtimeDisabledForConfiguration", msg)
rollback(conn, msg)

{:error, exception} ->
msg =
"Unable to subscribe to changes with given parameters. An exception happened so please check your connect parameters: [#{params_to_log(params)}]. Exception: #{Exception.message(exception)}"

log_error("RealtimeSubscriptionError", msg)

rollback(conn, msg)
end

Expand Down
9 changes: 0 additions & 9 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ defmodule Realtime.Tenants.Connect do
alias Realtime.Api.Tenant
alias Realtime.Rpc
alias Realtime.Tenants
alias Realtime.Tenants.Connect.Backoff
alias Realtime.Tenants.Connect.CheckConnection
alias Realtime.Tenants.Connect.GetTenant
alias Realtime.Tenants.Connect.Piper
Expand Down Expand Up @@ -125,10 +124,6 @@ defmodule Realtime.Tenants.Connect do
{:error, {:shutdown, :tenant_not_found}} ->
{:error, :tenant_not_found}

{:error, {:shutdown, :tenant_create_backoff}} ->
log_warning("TooManyConnectAttempts", "Too many connect attempts to tenant database", metadata)
{:error, :tenant_create_backoff}

{:error, :shutdown} ->
log_error("UnableToConnectToTenantDatabase", "Unable to connect to tenant database", metadata)
{:error, :tenant_database_unavailable}
Expand Down Expand Up @@ -192,7 +187,6 @@ defmodule Realtime.Tenants.Connect do

pipes = [
GetTenant,
Backoff,
CheckConnection,
StartCounters,
RegisterProcess
Expand All @@ -208,9 +202,6 @@ defmodule Realtime.Tenants.Connect do
{:error, :tenant_db_too_many_connections} ->
{:stop, {:shutdown, :tenant_db_too_many_connections}}

{:error, :tenant_create_backoff} ->
{:stop, {:shutdown, :tenant_create_backoff}}

{:error, error} ->
log_error("UnableToConnectToTenantDatabase", error)
{:stop, :shutdown}
Expand Down
38 changes: 0 additions & 38 deletions lib/realtime/tenants/connect/backoff.ex

This file was deleted.

14 changes: 10 additions & 4 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -278,22 +278,28 @@ defmodule RealtimeWeb.RealtimeChannel do
case PostgresCdc.after_connect(module, response, postgres_extension, pg_change_params) do
{:ok, _response} ->
message = "Subscribed to PostgreSQL"
Logger.info(message)
Logging.maybe_log_info(socket, message)
push_system_message("postgres_changes", socket, "ok", message, channel_name)
{:noreply, assign(socket, :pg_sub_ref, nil)}

error ->
log_warning("UnableToSubscribeToPostgres", error)
Logging.maybe_log_warning(socket, "RealtimeDisabledForConfiguration", error)

push_system_message("postgres_changes", socket, "error", error, channel_name)
{:noreply, assign(socket, :pg_sub_ref, postgres_subscribe(5, 10))}
end

nil ->
Logger.warning("Re-connecting to PostgreSQL with params: " <> inspect(pg_change_params))
Logging.maybe_log_warning(
socket,
"ReconnectSubscribeToPostgres",
"Re-connecting to PostgreSQL with params: " <> inspect(pg_change_params)
)

{:noreply, assign(socket, :pg_sub_ref, postgres_subscribe())}

error ->
log_warning("UnableToSubscribeToPostgres", error)
Logging.maybe_log_error(socket, "UnableToSubscribeToPostgres", error)
push_system_message("postgres_changes", socket, "error", error, channel_name)
{:noreply, assign(socket, :pg_sub_ref, postgres_subscribe(5, 10))}
end
Expand Down
2 changes: 1 addition & 1 deletion lib/realtime_web/channels/realtime_channel/assign.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ defmodule RealtimeWeb.RealtimeChannel.Assigns do

@type t :: %__MODULE__{
tenant: String.t(),
log_level: atom(),
log_level: Logger.level(),
rate_counter: Realtime.RateCounter.t(),
limits: %{
max_events_per_second: integer(),
Expand Down
29 changes: 29 additions & 0 deletions lib/realtime_web/channels/realtime_channel/logging.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,35 @@ defmodule RealtimeWeb.RealtimeChannel.Logging do

alias Realtime.Telemetry

@doc """
Checks if the log level set in the socket is less than or equal to the given level of the message to be logged.
"""
@spec maybe_log(
socket :: Phoenix.Socket.t(),
level :: Logger.level(),
code :: binary(),
msg :: binary(),
metadata :: keyword()
) :: :ok
def maybe_log(%{assigns: %{log_level: log_level}}, level, code, msg, metadata \\ []) do
metadata = if metadata == [], do: Logger.metadata()

if Logger.compare_levels(log_level, level) != :gt do
Logger.log(level, "#{code}: #{msg}", metadata)
end
end

def maybe_log_error(socket, code, msg, metadata \\ []), do: maybe_log(socket, :error, code, msg, metadata)
def maybe_log_warning(socket, code, msg, metadata \\ []), do: maybe_log(socket, :warning, code, msg, metadata)

def maybe_log_info(%{assigns: %{log_level: log_level}}, msg, metadata \\ []) do
metadata = if metadata == [], do: Logger.metadata()

if Logger.compare_levels(log_level, :info) != :gt do
Logger.info(inspect(msg), metadata)
end
end

@doc """
Logs messages according to user options given on config
"""
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.59.3",
version: "2.59.4",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
40 changes: 0 additions & 40 deletions test/realtime/tenants/connect/backoff_test.exs

This file was deleted.

37 changes: 0 additions & 37 deletions test/realtime/tenants/connect_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -348,43 +348,6 @@ defmodule Realtime.Tenants.ConnectTest do
end
end

describe "connect/1" do
test "respects backoff pipe", %{tenant: tenant} do
external_id = tenant.external_id
region = Tenants.region(tenant)

log =
capture_log(fn ->
for _ <- 1..10 do
Connect.connect(external_id, region)
Process.sleep(10)
Connect.shutdown(external_id)
end

assert {:error, :tenant_create_backoff} = Connect.connect(external_id, region)
end)

assert log =~ "Too many connect attempts to tenant database"
assert log =~ "project=#{external_id} external_id=#{external_id} [warning] TooManyConnectAttempts"
end

test "after timer, is able to connect", %{tenant: tenant} do
external_id = tenant.external_id
region = Tenants.region(tenant)

for _ <- 1..10 do
Connect.connect(external_id, region)
Process.sleep(10)
Connect.shutdown(external_id)
end

assert {:error, :tenant_create_backoff} = Connect.connect(external_id, region)

Process.sleep(5000)
assert {:ok, _pid} = Connect.connect(external_id, region)
end
end

describe "shutdown/1" do
test "shutdowns all associated connections", %{tenant: tenant} do
assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id)
Expand Down
70 changes: 70 additions & 0 deletions test/realtime_web/channels/realtime_channel/logging_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,74 @@ defmodule RealtimeWeb.RealtimeChannel.LoggingTest do
refute_receive {[:realtime, :channel, :error], %{code: "DatabaseConnectionIssue"}, %{code: "UnableToSetPolicies"}}
end
end

describe "maybe_log/3" do
test "logs messages at the specified level" do
socket = %{assigns: %{log_level: :info}}

assert capture_log(fn ->
Logging.maybe_log(socket, :info, "TestCode", "test message")
end) =~ "TestCode: test message"

assert capture_log(fn ->
Logging.maybe_log(socket, :error, "TestError", "test error")
end) =~ "TestError: test error"
end

test "does not log messages when log level is higher than the configured level" do
socket = %{assigns: %{log_level: :error}}

assert capture_log(fn ->
Logging.maybe_log(socket, :info, "TestCode", "test message")
end) == ""
end
end

describe "maybe_log_error/3" do
test "logs error messages at the error level" do
socket = %{assigns: %{log_level: :info}}

assert capture_log(fn ->
Logging.maybe_log_error(socket, "TestError", "test error")
end) =~ "TestError: test error"
end

test "does not log when log level is higher than error" do
socket = %{assigns: %{log_level: :emergency}}

assert capture_log(fn ->
Logging.maybe_log_error(socket, "TestError", "test error")
end) == ""
end
end

describe "maybe_log_warning/3" do
test "logs warning messages at the warning level" do
socket = %{assigns: %{log_level: :warning}}

assert capture_log(fn ->
Logging.maybe_log_warning(socket, "TestWarning", "test warning")
end) =~ "TestWarning: test warning"
end

test "does not log when log level is higher than warning" do
socket = %{assigns: %{log_level: :error}}

assert capture_log(fn -> Logging.maybe_log_warning(socket, "TestWarning", "test warning") end) == ""
end
end

describe "maybe_log_info/3" do
test "logs info messages at the info level" do
socket = %{assigns: %{log_level: :info}}

assert capture_log(fn -> Logging.maybe_log_info(socket, "test info") end) =~ "test info"
end

test "does not log when log level is higher than info" do
socket = %{assigns: %{log_level: :warning}}

assert capture_log(fn -> Logging.maybe_log_info(socket, "test info") end) == ""
end
end
end
Loading