From f4e299f8ff635a22c2c17e3aed9e7f24a16dbb76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Mon, 17 Feb 2025 22:32:32 +0000 Subject: [PATCH 1/2] fix: create containers per tenant Using docker we create containers per tenant_fixture to properly parallize testing and isolate DB changes from impacting other tests --- .github/workflows/tests.yml | 2 +- config/config.exs | 2 +- config/runtime.exs | 2 +- config/test.exs | 6 +- lib/realtime/context_cache.ex | 11 +- lib/realtime/tenants/connect.ex | 17 +- .../tenants/connect/check_connection.ex | 7 +- mix.exs | 1 - priv/repo/seeds.exs | 2 +- priv/repo/seeds_after_migration.exs | 58 ----- rel/overlays/config.example.yml | 2 +- test/api_jwt_secret_test.exs | 3 +- test/integration/rt_channel_test.exs | 16 +- test/realtime/api_test.exs | 203 ++++++++++-------- .../cluster_strategy/postgres_test.exs | 2 +- test/realtime/database_test.exs | 32 +-- .../extensions/cdc_rls/cdc_rls_test.exs | 3 +- .../extensions/cdc_rls/subscriptions_test.exs | 169 ++++----------- test/realtime/helpers_test.exs | 3 +- test/realtime/messages_test.exs | 8 +- test/realtime/metrics_cleaner_test.exs | 4 +- test/realtime/nodes_test.exs | 7 +- test/realtime/repo_test.exs | 135 +----------- test/realtime/signal_handler_test.exs | 13 +- test/realtime/tenants/authorization_test.exs | 12 +- .../tenants/cache_supervisor_test.exs | 2 +- test/realtime/tenants/cache_test.exs | 29 ++- test/realtime/tenants/connect_test.exs | 118 +++++----- test/realtime/tenants/janitor_test.exs | 24 +-- test/realtime/tenants/listen_test.exs | 73 +++---- test/realtime/tenants/migrations_test.exs | 16 +- .../tenants/replication_connection_test.exs | 102 ++++----- test/realtime/tenants_test.exs | 43 ++-- .../auth/channels_authorization_test.exs | 7 +- .../channels/auth/jwt_verification_test.exs | 5 +- .../broadcast_handler_test.exs | 30 ++- .../presence_handler_test.exs | 28 ++- .../channels/realtime_channel_test.exs | 17 +- .../controllers/broadcast_controller_test.exs | 182 ++++++---------- .../controllers/tenant_controller_test.exs | 55 +++-- .../realtime_web/plugs/assign_tenant_test.exs | 8 +- test/support/conn_case.ex | 5 +- test/support/containers.ex | 148 +++++++++++++ test/support/data_case.ex | 5 +- test/support/generators.ex | 11 +- test/test_helper.exs | 59 ++++- 46 files changed, 791 insertions(+), 896 deletions(-) delete mode 100644 priv/repo/seeds_after_migration.exs create mode 100644 test/support/containers.ex diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index fb343cd4f..d234865fc 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -61,6 +61,6 @@ jobs: - name: Start epmd run: epmd -daemon - name: Run tests - run: MIX_ENV=test mix coveralls.github --trace + run: MIX_ENV=test MAX_CASES=2 mix coveralls.github --trace env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/config/config.exs b/config/config.exs index da725e7b1..ecee1f689 100644 --- a/config/config.exs +++ b/config/config.exs @@ -13,7 +13,7 @@ config :realtime, # Configures the endpoint config :realtime, RealtimeWeb.Endpoint, - url: [host: "localhost"], + url: [host: "127.0.0.1"], secret_key_base: "ktyW57usZxrivYdvLo9os7UGcUUZYKchOMHT3tzndmnHuxD09k+fQnPUmxlPMUI3", render_errors: [view: RealtimeWeb.ErrorView, accepts: ~w(html json), layout: false], pubsub_server: Realtime.PubSub, diff --git a/config/runtime.exs b/config/runtime.exs index 1b3b7d149..1f57c0422 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -4,7 +4,7 @@ config :logflare_logger_backend, url: System.get_env("LOGFLARE_LOGGER_BACKEND_URL", "https://api.logflare.app") app_name = System.get_env("APP_NAME", "") -default_db_host = System.get_env("DB_HOST", "localhost") +default_db_host = System.get_env("DB_HOST", "127.0.0.1") username = System.get_env("DB_USER", "postgres") password = System.get_env("DB_PASSWORD", "postgres") database = System.get_env("DB_NAME", "postgres") diff --git a/config/test.exs b/config/test.exs index f57c79836..3a11e5e17 100644 --- a/config/test.exs +++ b/config/test.exs @@ -21,7 +21,7 @@ for repo <- [ username: "postgres", password: "postgres", database: "realtime_test", - hostname: "localhost", + hostname: "127.0.0.1", pool: Ecto.Adapters.SQL.Sandbox end @@ -43,7 +43,9 @@ config :joken, current_time_adapter: RealtimeWeb.Joken.CurrentTime.Mock # Print only errors during test -config :logger, level: :warning +config :logger, + compile_time_purge_matching: [[module: Postgrex], [module: DBConnection]], + level: :warning # Configures Elixir's Logger config :logger, :console, diff --git a/lib/realtime/context_cache.ex b/lib/realtime/context_cache.ex index 55f5aeee4..afacf4ce1 100644 --- a/lib/realtime/context_cache.ex +++ b/lib/realtime/context_cache.ex @@ -9,14 +9,9 @@ defmodule Realtime.ContextCache do cache = cache_name(context) cache_key = {{fun, arity}, args} - case Cachex.fetch(cache, cache_key, fn {{_fun, _arity}, args} -> - {:commit, {:cached, apply(context, fun, args)}} - end) do - {:commit, {:cached, value}} -> - value - - {:ok, {:cached, value}} -> - value + case Cachex.fetch(cache, cache_key, fn {{_fun, _arity}, args} -> {:commit, {:cached, apply(context, fun, args)}} end) do + {:commit, {:cached, value}} -> value + {:ok, {:cached, value}} -> value end end diff --git a/lib/realtime/tenants/connect.ex b/lib/realtime/tenants/connect.ex index cdf08f4cb..d3080be1e 100644 --- a/lib/realtime/tenants/connect.ex +++ b/lib/realtime/tenants/connect.ex @@ -25,12 +25,6 @@ defmodule Realtime.Tenants.Connect do alias Realtime.Tenants.Migrations alias Realtime.UsersCounter - @pipes [ - GetTenant, - CheckConnection, - StartCounters, - RegisterProcess - ] @rpc_timeout_default 30_000 @check_connected_user_interval_default 50_000 @connected_users_bucket_shutdown [0, 0, 0, 0, 0, 0] @@ -52,7 +46,7 @@ defmodule Realtime.Tenants.Connect do | {:error, :initializing} | {:error, :tenant_database_connection_initializing} | {:error, :rpc_error, term()} - def lookup_or_start_connection(tenant_id, opts \\ []) do + def lookup_or_start_connection(tenant_id, opts \\ []) when is_binary(tenant_id) do case get_status(tenant_id) do {:ok, conn} -> {:ok, conn} @@ -174,7 +168,14 @@ defmodule Realtime.Tenants.Connect do def init(%{tenant_id: tenant_id} = state) do Logger.metadata(external_id: tenant_id, project: tenant_id) - case Piper.run(@pipes, state) do + pipes = [ + GetTenant, + CheckConnection, + StartCounters, + RegisterProcess + ] + + case Piper.run(pipes, state) do {:ok, acc} -> {:ok, acc, {:continue, :run_migrations}} diff --git a/lib/realtime/tenants/connect/check_connection.ex b/lib/realtime/tenants/connect/check_connection.ex index 8f958379d..2e8fdc1cb 100644 --- a/lib/realtime/tenants/connect/check_connection.ex +++ b/lib/realtime/tenants/connect/check_connection.ex @@ -10,11 +10,8 @@ defmodule Realtime.Tenants.Connect.CheckConnection do %{tenant: tenant} = acc case Database.check_tenant_connection(tenant) do - {:ok, conn} -> - {:ok, %{acc | db_conn_pid: conn, db_conn_reference: Process.monitor(conn)}} - - {:error, error} -> - {:error, error} + {:ok, conn} -> {:ok, %{acc | db_conn_pid: conn, db_conn_reference: Process.monitor(conn)}} + {:error, error} -> {:error, error} end end end diff --git a/mix.exs b/mix.exs index d9d562449..3058883eb 100644 --- a/mix.exs +++ b/mix.exs @@ -101,7 +101,6 @@ defmodule Realtime.MixProject do "ecto.create --quiet", "run priv/repo/seeds_before_migration.exs", "ecto.migrate --migrations-path=priv/repo/migrations", - "run priv/repo/seeds_after_migration.exs", "test" ], "assets.deploy": ["esbuild default --minify", "tailwind default --minify", "phx.digest"] diff --git a/priv/repo/seeds.exs b/priv/repo/seeds.exs index 93ca695e4..0f61b119f 100644 --- a/priv/repo/seeds.exs +++ b/priv/repo/seeds.exs @@ -4,7 +4,7 @@ import Ecto.Adapters.SQL, only: [query: 3] tenant_name = System.get_env("SELF_HOST_TENANT_NAME", "realtime-dev") env = if :ets.whereis(Mix.State) != :undefined, do: Mix.env(), else: :prod -default_db_host = if env in [:dev, :test], do: "localhost", else: "host.docker.internal" +default_db_host = if env in [:dev, :test], do: "127.0.0.1", else: "host.docker.internal" Repo.transaction(fn -> case Repo.get_by(Tenant, external_id: tenant_name) do diff --git a/priv/repo/seeds_after_migration.exs b/priv/repo/seeds_after_migration.exs deleted file mode 100644 index 68da74d4a..000000000 --- a/priv/repo/seeds_after_migration.exs +++ /dev/null @@ -1,58 +0,0 @@ -alias Realtime.Api -alias Realtime.Repo -import Ecto.Adapters.SQL, only: [query: 3] - -publication = "supabase_realtime_test" - -db_conf = Application.get_env(:realtime, Repo) - -tenant_name = "dev_tenant" - -if Api.get_tenant_by_external_id(tenant_name) do - Api.delete_tenant_by_external_id(tenant_name) -end - -%{ - "name" => tenant_name, - "extensions" => [ - %{ - "type" => "postgres_cdc_rls", - "settings" => %{ - "db_host" => db_conf[:hostname], - "db_name" => db_conf[:database], - "db_user" => db_conf[:username], - "db_password" => db_conf[:password], - "db_port" => "5432", - "poll_interval_ms" => 100, - "poll_max_changes" => 100, - "poll_max_record_bytes" => 1_048_576, - "publication" => publication, - "region" => "us-east-1", - "ssl_enforced" => false - } - } - ], - "external_id" => tenant_name, - "jwt_secret" => "secure_jwt_secret" -} -|> Api.create_tenant() - -query(Repo, "drop publication #{publication}", []) - -{:ok, _} = - Repo.transaction(fn -> - [ - "drop table if exists \"public\".\"test\";", - "create sequence if not exists test_id_seq;", - "create table \"public\".\"test\" ( - \"id\" int4 not null default nextval('test_id_seq'::regclass), - \"details\" text, - primary key (\"id\") - );", - "grant all on table public.test to anon;", - "grant all on table public.test to postgres;", - "grant all on table public.test to authenticated;", - "create publication #{publication} for all tables" - ] - |> Enum.each(&query(Repo, &1, [])) - end) diff --git a/rel/overlays/config.example.yml b/rel/overlays/config.example.yml index 826fb53ca..cfef9d21a 100644 --- a/rel/overlays/config.example.yml +++ b/rel/overlays/config.example.yml @@ -1,6 +1,6 @@ endpoint_port: 4000 db_repo: - - hostname: "localhost" + - hostname: "127.0.0.1" username: "postgres" password: "postgres" database: "postgres" diff --git a/test/api_jwt_secret_test.exs b/test/api_jwt_secret_test.exs index 4fa3dc3bb..e451e10d9 100644 --- a/test/api_jwt_secret_test.exs +++ b/test/api_jwt_secret_test.exs @@ -1,5 +1,6 @@ defmodule RealtimeWeb.ApiJwtSecretTest do - use RealtimeWeb.ConnCase + # async: false due to usage of mock + use RealtimeWeb.ConnCase, async: false import Mock alias RealtimeWeb.JwtVerification diff --git a/test/integration/rt_channel_test.exs b/test/integration/rt_channel_test.exs index 489a568a7..749146aae 100644 --- a/test/integration/rt_channel_test.exs +++ b/test/integration/rt_channel_test.exs @@ -1,8 +1,7 @@ Code.require_file("../support/websocket_client.exs", __DIR__) defmodule Realtime.Integration.RtChannelTest do - # async: false due to the fact that multiple operations against the database will use the same connection - + # async: false due to the fact that multiple operations against the same tenant and usage of mocks use RealtimeWeb.ConnCase, async: false import ExUnit.CaptureLog import Generators @@ -14,16 +13,14 @@ defmodule Realtime.Integration.RtChannelTest do alias Phoenix.Socket.Message alias Phoenix.Socket.V1 alias Postgrex - alias Realtime.Api.Tenant alias Realtime.Database alias Realtime.Integration.RtChannelTest.Endpoint alias Realtime.Integration.WebsocketClient alias Realtime.RateCounter - alias Realtime.Repo alias Realtime.Tenants alias Realtime.Tenants.Authorization alias Realtime.Tenants.Cache - alias Realtime.Tenants.Migrations + @moduletag :capture_log @port 4002 @serializer V1.JSONSerializer @@ -78,8 +75,9 @@ defmodule Realtime.Integration.RtChannelTest do RateCounter.stop(@external_id) Cache.invalidate_tenant_cache(@external_id) Process.sleep(500) - [tenant] = Tenant |> Repo.all() |> Repo.preload(:extensions) - :ok = Migrations.run_migrations(tenant) + + tenant = Tenants.get_tenant_by_external_id(@external_id) + %{tenant: tenant} end @@ -1265,7 +1263,7 @@ defmodule Realtime.Integration.RtChannelTest do get_connection("authenticated", %{:exp => System.system_time(:second) - 1000}) end) - assert log =~ "InvalidJWTToken: Token as expired 1000 seconds ago" + assert log =~ "InvalidJWTToken: Token as expired" end end @@ -1466,7 +1464,7 @@ defmodule Realtime.Integration.RtChannelTest do claims = Map.merge( %{ - ref: "localhost", + ref: "127.0.0.1", iat: System.system_time(:second), exp: System.system_time(:second) + 604_800 }, diff --git a/test/realtime/api_test.exs b/test/realtime/api_test.exs index 0ce268acd..cacc48022 100644 --- a/test/realtime/api_test.exs +++ b/test/realtime/api_test.exs @@ -1,4 +1,5 @@ defmodule Realtime.ApiTest do + # async: false due to the fact that interacts with Realtime.Repo which means it might capture more entries than expected and due to usage of mocks use Realtime.DataCase, async: false import Mock @@ -9,101 +10,86 @@ defmodule Realtime.ApiTest do alias Realtime.Crypto alias Realtime.GenCounter alias Realtime.RateCounter - @db_conf Application.compile_env(:realtime, Realtime.Repo) - @valid_attrs %{ - external_id: "external_id", - name: "localhost", - extensions: [ - %{ - "type" => "postgres_cdc_rls", - "settings" => %{ - "db_host" => @db_conf[:hostname], - "db_name" => @db_conf[:database], - "db_user" => @db_conf[:username], - "db_password" => @db_conf[:password], - "db_port" => "5433", - "poll_interval" => 100, - "poll_max_changes" => 100, - "poll_max_record_bytes" => 1_048_576, - "region" => "us-east-1" - } - } - ], - postgres_cdc_default: "postgres_cdc_rls", - jwt_secret: "new secret", - max_concurrent_users: 200, - max_events_per_second: 100 - } - - @update_attrs %{ - external_id: "external_id1", - jwt_secret: "some updated jwt_secret", - name: "some updated name" - } - @invalid_attrs %{external_id: nil, jwt_secret: nil, name: nil} - setup do - start_supervised(Realtime.RateCounter.DynamicSupervisor) - start_supervised(Realtime.GenCounter.DynamicSupervisor) - - tenants = [ - tenant_fixture(%{external_id: "external_id1", max_concurrent_users: 10}), - tenant_fixture(%{external_id: "external_id2", max_concurrent_users: 5}), - tenant_fixture(%{external_id: "external_id3", max_concurrent_users: 15}), - tenant_fixture(%{external_id: "external_id4", max_concurrent_users: 20}), - tenant_fixture(%{external_id: "external_id5", max_concurrent_users: 25}) - ] + start_supervised(Realtime.RateCounter) + start_supervised(Realtime.GenCounter) - dev_tenant = Api.list_tenants(search: "dev_tenant") - tenants = tenants ++ dev_tenant - - Enum.each(tenants, fn tenant -> - :ok = - Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:" <> tenant.external_id) - end) + tenant_fixture(%{max_concurrent_users: 10_000_000}) + tenant_fixture(%{max_concurrent_users: 25_000_000}) + tenants = Api.list_tenants() %{tenants: tenants} end - describe "tenants" do - test "list_tenants/0 returns all tenants", %{tenants: tenants} do + describe "list_tenants/0" do + test "returns all tenants", %{tenants: tenants} do assert Enum.sort(Api.list_tenants()) == Enum.sort(tenants) end + end + describe "list_tenants/1" do test "list_tenants/1 returns filtered tenants", %{tenants: tenants} do assert hd(Api.list_tenants(search: hd(tenants).external_id)) == hd(tenants) - assert Api.list_tenants(order_by: "max_concurrent_users") == - Enum.sort_by(tenants, & &1.max_concurrent_users, :desc) - - assert Api.list_tenants(order_by: "max_concurrent_users", order: "asc") == - Enum.sort_by(tenants, & &1.max_concurrent_users, :asc) - - assert Api.list_tenants(order_by: "max_concurrent_users", order: "asc", limit: 2) == - tenants |> Enum.sort_by(& &1.max_concurrent_users, :asc) |> Enum.take(2) + assert Api.list_tenants(order_by: "max_concurrent_users", order: "desc", limit: 2) == + tenants |> Enum.sort_by(& &1.max_concurrent_users, :desc) |> Enum.take(2) end + end - test "get_tenant!/1 returns the tenant with given id", %{tenants: [tenant | _]} do - result = Api.get_tenant!(tenant.id) |> Map.delete(:extensions) + describe "get_tenant!/1" do + test "returns the tenant with given id", %{tenants: [tenant | _]} do + result = tenant.id |> Api.get_tenant!() |> Map.delete(:extensions) expected = tenant |> Map.delete(:extensions) assert result == expected end + end - test "create_tenant/1 with valid data creates a tenant" do - assert {:ok, %Tenant{} = tenant} = Api.create_tenant(@valid_attrs) + describe "create_tenant/1" do + test "valid data creates a tenant" do + port = Enum.random(5500..8000) + external_id = random_string() + + valid_attrs = %{ + external_id: external_id, + name: external_id, + extensions: [ + %{ + "type" => "postgres_cdc_rls", + "settings" => %{ + "db_host" => @db_conf[:hostname], + "db_name" => @db_conf[:database], + "db_user" => @db_conf[:username], + "db_password" => @db_conf[:password], + "db_port" => "#{port}", + "poll_interval" => 100, + "poll_max_changes" => 100, + "poll_max_record_bytes" => 1_048_576, + "region" => "us-east-1" + } + } + ], + postgres_cdc_default: "postgres_cdc_rls", + jwt_secret: "new secret", + max_concurrent_users: 200, + max_events_per_second: 100 + } - assert tenant.external_id == "external_id" + assert {:ok, %Tenant{} = tenant} = Api.create_tenant(valid_attrs) + + assert tenant.external_id == external_id assert tenant.jwt_secret == "YIriPuuJO1uerq5hSZ1W5Q==" - assert tenant.name == "localhost" + assert tenant.name == external_id end - test "create_tenant/1 with invalid data returns error changeset" do - assert {:error, %Ecto.Changeset{}} = Api.create_tenant(@invalid_attrs) + test "invalid data returns error changeset" do + assert {:error, %Ecto.Changeset{}} = Api.create_tenant(%{external_id: nil, jwt_secret: nil, name: nil}) end + end - test "check get_tenant_by_external_id/1", %{tenants: [tenant | _]} do + describe "get_tenant_by_external_id/1" do + test "fetch by external id", %{tenants: [tenant | _]} do %Tenant{extensions: [%Extensions{} = extension]} = Api.get_tenant_by_external_id(tenant.external_id) @@ -111,61 +97,85 @@ defmodule Realtime.ApiTest do password = extension.settings["db_password"] assert ^password = "v1QVng3N+pZd/0AEObABwg==" end + end - test "update_tenant/2 with valid data updates the tenant", %{tenants: [tenant | _]} do - assert {:ok, %Tenant{} = tenant} = Api.update_tenant(tenant, @update_attrs) - assert tenant.external_id == "external_id1" + describe "update_tenant/2" do + test "valid data updates the tenant" do + tenant = tenant_fixture() + + update_attrs = %{ + external_id: tenant.external_id, + jwt_secret: "some updated jwt_secret", + name: "some updated name" + } + + assert {:ok, %Tenant{} = tenant} = Api.update_tenant(tenant, update_attrs) + assert tenant.external_id == tenant.external_id assert tenant.jwt_secret == Crypto.encrypt!("some updated jwt_secret") assert tenant.name == "some updated name" end - test "update_tenant/2 with invalid data returns error changeset", %{tenants: [tenant | _]} do - assert {:error, %Ecto.Changeset{}} = Api.update_tenant(tenant, @invalid_attrs) + test "invalid data returns error changeset", %{tenants: [tenant | _]} do + assert {:error, %Ecto.Changeset{}} = Api.update_tenant(tenant, %{external_id: nil, jwt_secret: nil, name: nil}) end - test "update_tenant/2 with valid data and jwks change will send disconnect event", %{ - tenants: [tenant | _] - } do + test "valid data and jwks change will send disconnect event" do + tenant = tenant_fixture() + :ok = Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:" <> tenant.external_id) + assert {:ok, %Tenant{}} = Api.update_tenant(tenant, %{jwt_jwks: %{keys: ["test"]}}) - assert_receive :disconnect + assert_receive :disconnect, 500 end - test "update_tenant/2 with valid data and jwt_secret change will send disconnect event", %{ - tenants: [tenant | _] - } do + test "valid data and jwt_secret change will send disconnect event" do + tenant = tenant_fixture() + :ok = Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:" <> tenant.external_id) + assert {:ok, %Tenant{}} = Api.update_tenant(tenant, %{jwt_secret: "potato"}) - assert_receive :disconnect + + assert_receive :disconnect, 500 end - test "update_tenant/2 with valid data but not updating jwt_secret or jwt_jwks won't send event", - %{ - tenants: [tenant | _] - } do + test "valid data but not updating jwt_secret or jwt_jwks won't send event" do + tenant = tenant_fixture() + :ok = Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:" <> tenant.external_id) + assert {:ok, %Tenant{}} = Api.update_tenant(tenant, %{max_events_per_second: 100}) - refute_receive :disconnect + refute_receive :disconnect, 500 end + end - test "delete_tenant/1 deletes the tenant", %{tenants: [tenant | _]} do + describe "delete_tenant/1" do + test "deletes the tenant" do + tenant = tenant_fixture() assert {:ok, %Tenant{}} = Api.delete_tenant(tenant) assert_raise Ecto.NoResultsError, fn -> Api.get_tenant!(tenant.id) end end + end - test "delete_tenant_by_external_id/1 deletes the tenant", %{tenants: [tenant | _]} do + describe "delete_tenant_by_external_id/1" do + test "deletes the tenant" do + tenant = tenant_fixture() assert true == Api.delete_tenant_by_external_id(tenant.external_id) assert false == Api.delete_tenant_by_external_id("undef_tenant") assert_raise Ecto.NoResultsError, fn -> Api.get_tenant!(tenant.id) end end + end - test "change_tenant/1 returns a tenant changeset", %{tenants: [tenant | _]} do + describe "change_tenant/1" do + test "returns a tenant changeset", %{tenants: [tenant | _]} do assert %Ecto.Changeset{} = Api.change_tenant(tenant) end + end - test "list_extensions/1 ", %{tenants: tenants} do - assert length(Api.list_extensions()) == length(tenants) - end + test "list_extensions/1 ", %{tenants: tenants} do + assert length(Api.list_extensions()) == length(tenants) + end - test "preload_counters/1 ", %{tenants: [tenant | _]} do + describe "preload_counters/1" do + test "preloads counters for a given tenant ", %{tenants: [tenant | _]} do + tenant = Repo.reload!(tenant) assert Api.preload_counters(nil) == nil with_mocks([ @@ -179,8 +189,11 @@ defmodule Realtime.ApiTest do assert Api.preload_counters(nil, :any) == nil end + end - test "rename_settings_field/2", %{tenants: [tenant | _]} do + describe "rename_settings_field/2" do + test "renames setting fields" do + tenant = tenant_fixture() Api.rename_settings_field("poll_interval_ms", "poll_interval") assert %{extensions: [%{settings: %{"poll_interval" => _}}]} = tenant end diff --git a/test/realtime/cluster_strategy/postgres_test.exs b/test/realtime/cluster_strategy/postgres_test.exs index 31ed88f1b..a2226cf8e 100644 --- a/test/realtime/cluster_strategy/postgres_test.exs +++ b/test/realtime/cluster_strategy/postgres_test.exs @@ -45,7 +45,7 @@ defmodule Realtime.Cluster.Strategy.PostgresTest do defp opts do [ - hostname: "localhost", + hostname: "127.0.0.1", username: "postgres", password: "postgres", database: "realtime_test", diff --git a/test/realtime/database_test.exs b/test/realtime/database_test.exs index d6380cad5..a483a66ce 100644 --- a/test/realtime/database_test.exs +++ b/test/realtime/database_test.exs @@ -1,5 +1,5 @@ defmodule Realtime.DatabaseTest do - # async: false due to the deletion of the replication slot potentially affecting other tests + # async: false due to usage of mocks use Realtime.DataCase, async: false import ExUnit.CaptureLog @@ -9,29 +9,29 @@ defmodule Realtime.DatabaseTest do def handle_telemetry(event, metadata, _, pid: pid), do: send(pid, {event, metadata}) setup do - tenant = tenant_fixture() - :telemetry.attach(__MODULE__, [:realtime, :database, :transaction], &__MODULE__.handle_telemetry/4, pid: self()) - on_exit(fn -> :telemetry.detach(__MODULE__) end) - # Ensure no replication slot is present before the test - Cleanup.ensure_no_replication_slot() + tenant = Containers.checkout_tenant() + + on_exit(fn -> + :telemetry.detach(__MODULE__) + Containers.checkin_tenant(tenant) + end) %{tenant: tenant} end describe "check_tenant_connection/1" do setup context do + port = Enum.random(5500..9000) + extensions = [ %{ "type" => "postgres_cdc_rls", "settings" => %{ - "db_host" => "localhost", + "db_host" => "127.0.0.1", "db_name" => "postgres", "db_user" => "supabase_admin", "db_password" => "postgres", - "db_port" => "5433", - "poll_interval" => 100, - "poll_max_changes" => 100, - "poll_max_record_bytes" => 1_048_576, + "db_port" => "#{port}", "region" => "us-east-1", "ssl_enforced" => false, "db_pool" => Map.get(context, :db_pool), @@ -42,7 +42,8 @@ defmodule Realtime.DatabaseTest do ] tenant = tenant_fixture(%{extensions: extensions}) - + Containers.initialize(tenant, true) + on_exit(fn -> Containers.stop_container(tenant) end) %{tenant: tenant} end @@ -245,17 +246,18 @@ defmodule Realtime.DatabaseTest do test "returns struct with correct setup", %{tenant: tenant} do application_name = "realtime_connect" backoff = :stop - {:ok, ip_version} = Database.detect_ip_version("localhost") + {:ok, ip_version} = Database.detect_ip_version("127.0.0.1") socket_options = [ip_version] settings = Realtime.PostgresCdc.filter_settings("postgres_cdc_rls", tenant.extensions) settings = Database.from_settings(settings, application_name, backoff) + port = settings.port assert %Realtime.Database{ socket_options: ^socket_options, application_name: ^application_name, backoff_type: ^backoff, - hostname: "localhost", - port: 5433, + hostname: "127.0.0.1", + port: ^port, database: "postgres", username: "supabase_admin", password: "postgres", diff --git a/test/realtime/extensions/cdc_rls/cdc_rls_test.exs b/test/realtime/extensions/cdc_rls/cdc_rls_test.exs index 4eece24d0..b4f566a4c 100644 --- a/test/realtime/extensions/cdc_rls/cdc_rls_test.exs +++ b/test/realtime/extensions/cdc_rls/cdc_rls_test.exs @@ -1,5 +1,6 @@ defmodule Realtime.Extensions.CdcRlsTest do - use RealtimeWeb.ChannelCase + # async: false due to usage of dev_realtime and mocks + use RealtimeWeb.ChannelCase, async: false use RealtimeWeb.ConnCase import Mock diff --git a/test/realtime/extensions/cdc_rls/subscriptions_test.exs b/test/realtime/extensions/cdc_rls/subscriptions_test.exs index 4372f98ba..cbc10fc77 100644 --- a/test/realtime/extensions/cdc_rls/subscriptions_test.exs +++ b/test/realtime/extensions/cdc_rls/subscriptions_test.exs @@ -1,196 +1,121 @@ -defmodule Realtime.Extensions.CdcRlsSubscriptionsTest do +defmodule Realtime.Extensionsubscriptions.CdcRlsSubscriptionsTest do # async: false due to the fact that it uses the database use RealtimeWeb.ChannelCase, async: false doctest Extensions.PostgresCdcRls.Subscriptions - alias Extensions.PostgresCdcRls.Subscriptions, as: S - alias Postgrex, as: P + alias Extensions.PostgresCdcRls.Subscriptions + alias Realtime.Database + alias Realtime.Tenants - setup %{} do - repo = Application.get_env(:realtime, Realtime.Repo) + setup do + tenant = Tenants.get_tenant_by_external_id("dev_tenant") {:ok, conn} = - Postgrex.start_link( - hostname: repo[:hostname], - database: repo[:database], - password: repo[:password], - username: repo[:username] - ) + tenant + |> Database.from_tenant("realtime_rls") + |> Map.from_struct() + |> Keyword.new() + |> Postgrex.start_link() %{conn: conn} end test "create", %{conn: conn} do - S.delete_all(conn) + Subscriptions.delete_all(conn) - assert %Postgrex.Result{rows: [[0]]} = - P.query!(conn, "select count(*) from realtime.subscription", []) + assert %Postgrex.Result{rows: [[0]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", []) - params_list = [ - %{ - claims: %{ - "role" => "anon" - }, - id: UUID.uuid1(), - params: %{"event" => "*", "schema" => "public"} - } - ] + params_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), params: %{"event" => "*", "schema" => "public"}}] assert {:ok, [%Postgrex.Result{}]} = - S.create(conn, "supabase_realtime_test", params_list, self(), self()) + Subscriptions.create(conn, "supabase_realtime_test", params_list, self(), self()) Process.sleep(500) - params_list = [ - %{ - claims: %{ - "role" => "anon" - }, - id: UUID.uuid1(), - params: %{"schema" => "public", "table" => "tenants"} - } - ] + params_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), params: %{"schema" => "public", "table" => "test"}}] assert {:ok, [%Postgrex.Result{}]} = - S.create(conn, "supabase_realtime_test", params_list, self(), self()) + Subscriptions.create(conn, "supabase_realtime_test", params_list, self(), self()) Process.sleep(500) - params_list = [ - %{ - claims: %{ - "role" => "anon" - }, - id: UUID.uuid1(), - params: %{} - } - ] + params_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), params: %{}}] assert {:error, "No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: %{}"} = - S.create(conn, "supabase_realtime_test", params_list, self(), self()) + Subscriptions.create(conn, "supabase_realtime_test", params_list, self(), self()) Process.sleep(500) - params_list = [ - %{ - claims: %{ - "role" => "anon" - }, - id: UUID.uuid1(), - params: %{ - "user_token" => "potato" - } - } - ] + params_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), params: %{"user_token" => "potato"}}] assert {:error, "No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: "} = - S.create(conn, "supabase_realtime_test", params_list, self(), self()) + Subscriptions.create(conn, "supabase_realtime_test", params_list, self(), self()) Process.sleep(500) - params_list = [ - %{ - claims: %{ - "role" => "anon" - }, - id: UUID.uuid1(), - params: %{ - "auth_token" => "potato" - } - } - ] + params_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), params: %{"auth_token" => "potato"}}] assert {:error, "No subscription params provided. Please provide at least a `schema` or `table` to subscribe to: "} = - S.create(conn, "supabase_realtime_test", params_list, self(), self()) + Subscriptions.create(conn, "supabase_realtime_test", params_list, self(), self()) Process.sleep(500) - %Postgrex.Result{rows: [[num]]} = - P.query!(conn, "select count(*) from realtime.subscription", []) - + %Postgrex.Result{rows: [[num]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", []) assert num != 0 end test "delete_all", %{conn: conn} do create_subscriptions(conn, 10) - assert {:ok, %P.Result{}} = S.delete_all(conn) - - assert %Postgrex.Result{rows: [[0]]} = - P.query!(conn, "select count(*) from realtime.subscription", []) + assert {:ok, %Postgrex.Result{}} = Subscriptions.delete_all(conn) + assert %Postgrex.Result{rows: [[0]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", []) end test "delete", %{conn: conn} do - S.delete_all(conn) + Subscriptions.delete_all(conn) id = UUID.uuid1() - bin_id = id |> UUID.string_to_binary!() - - params_list = [ - %{ - claims: %{ - "role" => "anon" - }, - id: id, - params: %{"event" => "*"} - } - ] + bin_id = UUID.string_to_binary!(id) - S.create(conn, "supabase_realtime_test", params_list, self(), self()) + params_list = [%{id: id, claims: %{"role" => "anon"}, params: %{"event" => "*"}}] + Subscriptions.create(conn, "supabase_realtime_test", params_list, self(), self()) Process.sleep(500) - assert {:ok, %P.Result{}} = S.delete(conn, bin_id) - - assert %Postgrex.Result{rows: [[0]]} = - P.query!(conn, "select count(*) from realtime.subscription", []) + assert {:ok, %Postgrex.Result{}} = Subscriptions.delete(conn, bin_id) + assert %Postgrex.Result{rows: [[0]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", []) end test "delete_multi", %{conn: conn} do - S.delete_all(conn) + Subscriptions.delete_all(conn) id1 = UUID.uuid1() - bin_id1 = id1 |> UUID.string_to_binary!() - id2 = UUID.uuid1() - bin_id2 = id2 |> UUID.string_to_binary!() + + bin_id2 = UUID.string_to_binary!(id2) + bin_id1 = UUID.string_to_binary!(id1) params_list = [ - %{ - claims: %{ - "role" => "anon" - }, - id: id1, - params: %{"event" => "*"} - }, - %{ - claims: %{ - "role" => "anon" - }, - id: id2, - params: %{"event" => "*"} - } + %{claims: %{"role" => "anon"}, id: id1, params: %{"event" => "*"}}, + %{claims: %{"role" => "anon"}, id: id2, params: %{"event" => "*"}} ] - S.create(conn, "supabase_realtime_test", params_list, self(), self()) + Subscriptions.create(conn, "supabase_realtime_test", params_list, self(), self()) Process.sleep(500) - assert {:ok, %P.Result{}} = S.delete_multi(conn, [bin_id1, bin_id2]) - - assert %Postgrex.Result{rows: [[0]]} = - P.query!(conn, "select count(*) from realtime.subscription", []) + assert {:ok, %Postgrex.Result{}} = Subscriptions.delete_multi(conn, [bin_id1, bin_id2]) + assert %Postgrex.Result{rows: [[0]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", []) end test "maybe_delete_all", %{conn: conn} do - S.delete_all(conn) + Subscriptions.delete_all(conn) create_subscriptions(conn, 10) - assert {:ok, %P.Result{}} = S.maybe_delete_all(conn) - assert %Postgrex.Result{rows: [[0]]} = - P.query!(conn, "select count(*) from realtime.subscription", []) + assert {:ok, %Postgrex.Result{}} = Subscriptions.maybe_delete_all(conn) + assert %Postgrex.Result{rows: [[0]]} = Postgrex.query!(conn, "select count(*) from realtime.subscription", []) end test "fetch_publication_tables", %{conn: conn} do - tables = S.fetch_publication_tables(conn, "supabase_realtime_test") + tables = Subscriptions.fetch_publication_tables(conn, "supabase_realtime_test") assert tables[{"*"}] != nil end @@ -203,7 +128,7 @@ defmodule Realtime.Extensions.CdcRlsSubscriptionsTest do "exp" => 1_974_176_791, "iat" => 1_658_600_791, "iss" => "supabase", - "ref" => "localhost", + "ref" => "127.0.0.1", "role" => "anon" }, id: UUID.uuid1(), @@ -213,7 +138,7 @@ defmodule Realtime.Extensions.CdcRlsSubscriptionsTest do ] end) - S.create(conn, "supabase_realtime_test", params_list, self(), self()) + Subscriptions.create(conn, "supabase_realtime_test", params_list, self(), self()) Process.sleep(500) end end diff --git a/test/realtime/helpers_test.exs b/test/realtime/helpers_test.exs index 102356b9d..d71e443b1 100644 --- a/test/realtime/helpers_test.exs +++ b/test/realtime/helpers_test.exs @@ -1,5 +1,4 @@ defmodule Realtime.HelpersTest do - use Realtime.DataCase, async: false - # async: false due to the deletion of the replication slot potentially affecting other tests + use Realtime.DataCase doctest Realtime.Helpers end diff --git a/test/realtime/messages_test.exs b/test/realtime/messages_test.exs index dc816cded..c1d18e78f 100644 --- a/test/realtime/messages_test.exs +++ b/test/realtime/messages_test.exs @@ -5,14 +5,12 @@ defmodule Realtime.MessagesTest do alias Realtime.Database alias Realtime.Messages alias Realtime.Repo - alias Realtime.Tenants.Migrations setup do - tenant = tenant_fixture() - Migrations.run_migrations(tenant) - + tenant = Containers.checkout_tenant(true) + on_exit(fn -> Containers.checkin_tenant(tenant) end) {:ok, conn} = Database.connect(tenant, "realtime_test", :stop) - clean_table(conn, "realtime", "messages") + date_start = Date.utc_today() |> Date.add(-10) date_end = Date.utc_today() create_messages_partitions(conn, date_start, date_end) diff --git a/test/realtime/metrics_cleaner_test.exs b/test/realtime/metrics_cleaner_test.exs index 5abe7fd9f..cad8eb674 100644 --- a/test/realtime/metrics_cleaner_test.exs +++ b/test/realtime/metrics_cleaner_test.exs @@ -1,14 +1,16 @@ defmodule Realtime.MetricsCleanerTest do + # async: false due to potentially polluting metrics with other tenant metrics from other tests use Realtime.DataCase, async: false alias Realtime.MetricsCleaner setup do interval = Application.get_env(:realtime, :metrics_cleaner_schedule_timer_in_ms) Application.put_env(:realtime, :metrics_cleaner_schedule_timer_in_ms, 100) - tenant = tenant_fixture() + tenant = Containers.checkout_tenant(true) on_exit(fn -> Application.put_env(:realtime, :metrics_cleaner_schedule_timer_in_ms, interval) + Containers.checkin_tenant(tenant) end) %{tenant: tenant} diff --git a/test/realtime/nodes_test.exs b/test/realtime/nodes_test.exs index d5b9875c9..5e0c8b21e 100644 --- a/test/realtime/nodes_test.exs +++ b/test/realtime/nodes_test.exs @@ -1,13 +1,14 @@ defmodule Realtime.NodesTest do - use Realtime.DataCase + # async: false as we will be testing :syn logic which can be impacted by other tests and usage of mocks + use Realtime.DataCase, async: false alias Realtime.Nodes import Mock describe "get_node_for_tenant/1" do setup do - tenant = tenant_fixture() + tenant = Containers.checkout_tenant() region = tenant.extensions |> hd() |> Map.get(:settings) |> Map.get("region") - + on_exit(fn -> Containers.checkin_tenant(tenant) end) %{tenant: tenant, region: region} end diff --git a/test/realtime/repo_test.exs b/test/realtime/repo_test.exs index d7c0fbbdd..b8590a0ee 100644 --- a/test/realtime/repo_test.exs +++ b/test/realtime/repo_test.exs @@ -1,133 +1,17 @@ defmodule Realtime.RepoTest do - # async: false due to the fact that multiple operations against the database will use the same connection - use Realtime.DataCase, async: false + use Realtime.DataCase, async: true import Ecto.Query alias Realtime.Api.Message alias Realtime.Repo alias Realtime.Database - alias Realtime.Tenants.Migrations setup do - tenant = tenant_fixture() + tenant = Containers.checkout_tenant(true) + on_exit(fn -> Containers.checkin_tenant(tenant) end) {:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop) - - Migrations.run_migrations(tenant) - - clean_table(db_conn, "realtime", "messages") - %{db_conn: db_conn, tenant: tenant} - end - - describe "with_dynamic_repo/2" do - test "starts a repo with the given config and kills it in the end of the command" do - test_pid = self() - - Repo.with_dynamic_repo(db_config(), fn repo -> - Ecto.Adapters.SQL.query(repo, "SELECT 1", []) - send(test_pid, repo) - send(test_pid, :query_success) - end) - - repo_pid = - receive do - repo_pid -> repo_pid - end - - assert_receive :query_success - assert Process.alive?(repo_pid) == false - end - - test "kills repo pid when we kill parent pid" do - test_pid = self() - - parent_pid = - spawn(fn -> - Repo.with_dynamic_repo(db_config(), fn repo -> - send(test_pid, repo) - Ecto.Adapters.SQL.query(repo, "SELECT pg_sleep(1)", []) - raise("Should not run query") - end) - end) - - repo_pid = - receive do - repo_pid -> repo_pid - end - - true = Process.exit(parent_pid, :kill) - Process.sleep(1500) - assert Process.alive?(repo_pid) == false - end - - test "concurrent repos can coexist" do - test_pid = self() - - pid_1 = - spawn(fn -> - Repo.with_dynamic_repo(db_config(), fn repo -> - send(test_pid, repo) - Ecto.Adapters.SQL.query(repo, "SELECT pg_sleep(1)", []) - send(test_pid, :query_success) - end) - end) - - pid_2 = - spawn(fn -> - Repo.with_dynamic_repo(db_config(), fn repo -> - send(test_pid, repo) - Ecto.Adapters.SQL.query(repo, "SELECT pg_sleep(1)", []) - send(test_pid, :query_success) - end) - end) - - repo_pid_1 = - receive do - repo_pid -> repo_pid - end - - repo_pid_2 = - receive do - repo_pid -> repo_pid - end - - assert Process.alive?(repo_pid_1) == true - assert Process.alive?(repo_pid_2) == true - - assert_receive :query_success, 2000 - assert_receive :query_success, 2000 - - Process.sleep(100) - assert Process.alive?(repo_pid_1) == false - assert Process.alive?(repo_pid_2) == false - assert Process.alive?(pid_1) == false - assert Process.alive?(pid_2) == false - end - - test "on exception from query" do - test_pid = self() - - try do - spawn(fn -> - Repo.with_dynamic_repo(db_config(), fn repo -> - send(test_pid, repo) - Process.sleep(100) - raise "💣" - end) - end) - catch - _ -> :ok - end - - repo_pid = - receive do - repo_pid -> repo_pid - end - - assert Process.alive?(repo_pid) == true - Process.sleep(300) - assert Process.alive?(repo_pid) == false - end + %{tenant: tenant, db_conn: db_conn} end describe "all/3" do @@ -179,10 +63,6 @@ defmodule Realtime.RepoTest do end describe "insert/3" do - setup %{db_conn: db_conn} do - Realtime.Tenants.Migrations.create_partitions(db_conn) - end - test "inserts a new entry with a given changeset and returns struct", %{db_conn: db_conn} do changeset = Message.changeset(%Message{}, %{topic: "foo", extension: :presence}) @@ -325,11 +205,4 @@ defmodule Realtime.RepoTest do assert {:error, :postgrex_exception} = Repo.update(db_conn, changeset, Message) end end - - defp db_config do - tenant_fixture() - |> Realtime.Database.from_tenant("realtime_test") - |> Map.to_list() - |> Keyword.new() - end end diff --git a/test/realtime/signal_handler_test.exs b/test/realtime/signal_handler_test.exs index 184d79d69..e694f0a7a 100644 --- a/test/realtime/signal_handler_test.exs +++ b/test/realtime/signal_handler_test.exs @@ -2,10 +2,9 @@ defmodule Realtime.SignalHandlerTest do use ExUnit.Case import ExUnit.CaptureLog alias Realtime.SignalHandler - import Mock defmodule FakeHandler do - def handle_event(:sigterm, _state), do: :ok + def handle_event(:sigterm, _state), do: send(self(), :ok) end setup do @@ -16,14 +15,12 @@ defmodule Realtime.SignalHandlerTest do describe "signal handling" do test "sends signal to handler_mod" do - with_mock FakeHandler, handle_event: fn :sigterm, _state -> :ok end do - {:ok, state} = SignalHandler.init({%{handler_mod: FakeHandler}, :ok}) + {:ok, state} = SignalHandler.init({%{handler_mod: FakeHandler}, :ok}) - assert capture_log(fn -> SignalHandler.handle_event(:sigterm, state) end) =~ - "SignalHandler: :sigterm received" + assert capture_log(fn -> SignalHandler.handle_event(:sigterm, state) end) =~ + "SignalHandler: :sigterm received" - assert_called_exactly(FakeHandler.handle_event(:sigterm, :_), 1) - end + assert_receive :ok end end diff --git a/test/realtime/tenants/authorization_test.exs b/test/realtime/tenants/authorization_test.exs index 83688ccfe..289fb5db2 100644 --- a/test/realtime/tenants/authorization_test.exs +++ b/test/realtime/tenants/authorization_test.exs @@ -1,5 +1,5 @@ defmodule Realtime.Tenants.AuthorizationTest do - # Needs to be false due to some conflicts when fetching connection from the pool since this use Postgrex directly + # async: false due to usege of mocks use RealtimeWeb.ConnCase, async: false require Phoenix.ChannelTest @@ -13,7 +13,6 @@ defmodule Realtime.Tenants.AuthorizationTest do alias Realtime.Tenants.Authorization.Policies alias Realtime.Tenants.Authorization.Policies.BroadcastPolicies alias Realtime.Tenants.Authorization.Policies.PresencePolicies - alias Realtime.Tenants.Migrations alias RealtimeWeb.Joken.CurrentTime setup [:rls_context] @@ -366,13 +365,10 @@ defmodule Realtime.Tenants.AuthorizationTest do end def rls_context(context) do - start_supervised!(CurrentTime.Mock) - tenant = tenant_fixture() - :ok = Migrations.run_migrations(tenant) - + start_supervised(CurrentTime.Mock) + tenant = Containers.checkout_tenant(true) + on_exit(fn -> Containers.checkin_tenant(tenant) end) {:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop) - - clean_table(db_conn, "realtime", "messages") topic = random_string() create_rls_policies(db_conn, context.policies, %{topic: topic}) diff --git a/test/realtime/tenants/cache_supervisor_test.exs b/test/realtime/tenants/cache_supervisor_test.exs index 533a108fe..d82463bc1 100644 --- a/test/realtime/tenants/cache_supervisor_test.exs +++ b/test/realtime/tenants/cache_supervisor_test.exs @@ -1,5 +1,5 @@ defmodule Realtime.Tenants.CacheSupervisorTest do - use Realtime.DataCase, async: false + use Realtime.DataCase, async: true alias Realtime.Api.Tenant alias Realtime.Tenants.Cache diff --git a/test/realtime/tenants/cache_test.exs b/test/realtime/tenants/cache_test.exs index e6aaa5457..35c0536fd 100644 --- a/test/realtime/tenants/cache_test.exs +++ b/test/realtime/tenants/cache_test.exs @@ -1,59 +1,56 @@ defmodule Realtime.Tenants.CacheTest do - use Realtime.DataCase + use Realtime.DataCase, async: true alias Realtime.Api + alias Realtime.Tenants.Cache alias Realtime.Tenants setup do - %{tenant: tenant_fixture()} + {:ok, tenant: tenant_fixture()} end describe "get_tenant_by_external_id/1" do test "tenants cache returns a cached result", %{tenant: tenant} do external_id = tenant.external_id - - assert %Api.Tenant{name: "localhost"} = Tenants.Cache.get_tenant_by_external_id(external_id) - + assert %Api.Tenant{name: "tenant"} = Cache.get_tenant_by_external_id(external_id) Api.update_tenant(tenant, %{name: "new name"}) - assert %Api.Tenant{name: "new name"} = Tenants.get_tenant_by_external_id(external_id) - - assert %Api.Tenant{name: "localhost"} = Tenants.Cache.get_tenant_by_external_id(external_id) + assert %Api.Tenant{name: "tenant"} = Cache.get_tenant_by_external_id(external_id) end end describe "invalidate_tenant_cache/1" do test "invalidates the cache given a tenant_id", %{tenant: tenant} do external_id = tenant.external_id - assert %Api.Tenant{suspend: false} = Tenants.Cache.get_tenant_by_external_id(external_id) + assert %Api.Tenant{suspend: false} = Cache.get_tenant_by_external_id(external_id) # Update a tenant tenant |> Realtime.Api.Tenant.changeset(%{suspend: true}) |> Realtime.Repo.update!() # Cache showing old value - assert %Api.Tenant{suspend: false} = Tenants.Cache.get_tenant_by_external_id(external_id) + assert %Api.Tenant{suspend: false} = Cache.get_tenant_by_external_id(external_id) # Invalidate cache - Tenants.Cache.invalidate_tenant_cache(external_id) - assert %Api.Tenant{suspend: true} = Tenants.Cache.get_tenant_by_external_id(external_id) + Cache.invalidate_tenant_cache(external_id) + assert %Api.Tenant{suspend: true} = Cache.get_tenant_by_external_id(external_id) end end describe "distributed_invalidate_tenant_cache/1" do test "invalidates the cache given a tenant_id", %{tenant: tenant} do external_id = tenant.external_id - assert %Api.Tenant{suspend: false} = Tenants.Cache.get_tenant_by_external_id(external_id) + assert %Api.Tenant{suspend: false} = Cache.get_tenant_by_external_id(external_id) # Delete tenant Realtime.Repo.delete(tenant) # Cache showing non existing tenant - assert %Api.Tenant{suspend: false} = Tenants.Cache.get_tenant_by_external_id(external_id) + assert %Api.Tenant{suspend: false} = Cache.get_tenant_by_external_id(external_id) # Invalidate cache - Tenants.Cache.distributed_invalidate_tenant_cache(external_id) + Cache.distributed_invalidate_tenant_cache(external_id) Process.sleep(500) - refute Tenants.Cache.get_tenant_by_external_id(external_id) + refute Cache.get_tenant_by_external_id(external_id) end end end diff --git a/test/realtime/tenants/connect_test.exs b/test/realtime/tenants/connect_test.exs index 2926101ca..22381a962 100644 --- a/test/realtime/tenants/connect_test.exs +++ b/test/realtime/tenants/connect_test.exs @@ -1,5 +1,5 @@ defmodule Realtime.Tenants.ConnectTest do - # async: false due to the fact that multiple operations against the database will use the same connection + # async: false due to the fact that we are checking ets tables for user tracking and usage of mocks use Realtime.DataCase, async: false import ExUnit.CaptureLog import Mock @@ -12,43 +12,47 @@ defmodule Realtime.Tenants.ConnectTest do alias Realtime.UsersCounter setup do + tenant = Containers.checkout_tenant() :ets.delete_all_objects(Connect) - tenant = tenant_fixture() - Cleanup.ensure_no_replication_slot() + on_exit(fn -> Containers.checkin_tenant(tenant) end) + %{tenant: tenant} end describe "lookup_or_start_connection/1" do - test "if tenant exists and connected, returns the db connection and tracks it in ets", %{ - tenant: tenant - } do + test "if tenant exists and connected, returns the db connection and tracks it in ets", %{tenant: tenant} do assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id) - Process.sleep(100) + Process.sleep(500) assert is_pid(db_conn) - Connect.shutdown(tenant.external_id) + assert Connect.shutdown(tenant.external_id) == :ok end test "tracks multiple users that connect and disconnect" do expected = - for _ <- 1..10 do + for _ <- 1..2 do tenant = tenant_fixture() + tenant = Containers.initialize(tenant, true, true) + on_exit(fn -> Containers.stop_container(tenant) end) + assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id) - Process.sleep(100) + Process.sleep(500) + assert is_pid(db_conn) Connect.shutdown(tenant.external_id) - {tenant.external_id} + + tenant.external_id end - result = :ets.select(Connect, [{:"$1", [], [:"$1"]}]) |> Enum.sort() + result = :ets.select(Connect, [{{:"$1"}, [], [:"$1"]}]) |> Enum.sort() expected = Enum.sort(expected) assert result == expected end test "on database disconnect, returns new connection", %{tenant: tenant} do assert {:ok, old_conn} = Connect.lookup_or_start_connection(tenant.external_id) - Process.sleep(500) - GenServer.stop(old_conn) - Process.sleep(500) + Process.sleep(1000) + Connect.shutdown(tenant.external_id) + Process.sleep(1000) assert {:ok, new_conn} = Connect.lookup_or_start_connection(tenant.external_id) @@ -59,20 +63,22 @@ defmodule Realtime.Tenants.ConnectTest do end test "if tenant exists but unable to connect, returns error" do + port = Enum.random(5500..9000) + extensions = [ %{ "type" => "postgres_cdc_rls", "settings" => %{ - "db_host" => "localhost", - "db_name" => "false", - "db_user" => "false", - "db_password" => "false", - "db_port" => "5433", + "db_host" => "127.0.0.1", + "db_name" => "postgres", + "db_user" => "postgres", + "db_password" => "postgres", + "db_port" => "#{port}", "poll_interval" => 100, "poll_max_changes" => 100, "poll_max_record_bytes" => 1_048_576, "region" => "us-east-1", - "ssl_enforced" => false + "ssl_enforced" => true } } ] @@ -87,9 +93,7 @@ defmodule Realtime.Tenants.ConnectTest do assert {:error, :tenant_not_found} = Connect.lookup_or_start_connection("none") end - test "if no users are connected to a tenant channel, stop the connection", %{ - tenant: %{external_id: tenant_id} - } do + test "if no users are connected to a tenant channel, stop the connection", %{tenant: %{external_id: tenant_id}} do {:ok, db_conn} = Connect.lookup_or_start_connection(tenant_id, check_connected_user_interval: 100) @@ -104,9 +108,7 @@ defmodule Realtime.Tenants.ConnectTest do Connect.shutdown(tenant_id) end - test "if users are connected to a tenant channel, keep the connection", %{ - tenant: %{external_id: tenant_id} - } do + test "if users are connected to a tenant channel, keep the connection", %{tenant: %{external_id: tenant_id}} do UsersCounter.add(self(), tenant_id) {:ok, db_conn} = @@ -122,32 +124,33 @@ defmodule Realtime.Tenants.ConnectTest do Connect.shutdown(tenant_id) end - test "connection is killed after user leaving", %{ - tenant: %{external_id: tenant_id} - } do - UsersCounter.add(self(), tenant_id) + test "connection is killed after user leaving" do + tenant = tenant_fixture() + %{external_id: external_id} = Containers.initialize(tenant, true, true) + on_exit(fn -> Containers.stop_container(tenant) end) - {:ok, db_conn} = - Connect.lookup_or_start_connection(tenant_id, check_connected_user_interval: 10) + UsersCounter.add(self(), external_id) + + {:ok, db_conn} = Connect.lookup_or_start_connection(external_id, check_connected_user_interval: 10) - assert {_pid, %{conn: _conn_pid}} = :syn.lookup(Connect, tenant_id) + assert {_pid, %{conn: ^db_conn}} = :syn.lookup(Connect, external_id) Process.sleep(1000) - :syn.leave(:users, tenant_id, self()) + :syn.leave(:users, external_id, self()) Process.sleep(1000) - assert :undefined = :syn.lookup(Connect, tenant_id) + assert :undefined = :syn.lookup(Connect, external_id) refute Process.alive?(db_conn) - Connect.shutdown(tenant_id) + Connect.shutdown(external_id) end test "error if tenant is suspended" do tenant = tenant_fixture(suspend: true) - assert {:error, :tenant_suspended} = Connect.lookup_or_start_connection(tenant.external_id) end test "handles tenant suspension and unsuspension in a reactive way" do tenant = tenant_fixture() - + tenant = Containers.initialize(tenant, true, true) + on_exit(fn -> Containers.stop_container(tenant) end) assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id) Process.sleep(500) @@ -180,17 +183,19 @@ defmodule Realtime.Tenants.ConnectTest do end test "properly handles of failing calls by avoid creating too many connections" do + port = Enum.random(5500..6000) + tenant = tenant_fixture(%{ extensions: [ %{ "type" => "postgres_cdc_rls", "settings" => %{ - "db_host" => "localhost", + "db_host" => "127.0.0.1", "db_name" => "postgres", "db_user" => "supabase_admin", "db_password" => "postgres", - "db_port" => "5433", + "db_port" => "#{port}", "poll_interval" => 100, "poll_max_changes" => 100, "poll_max_record_bytes" => 1_048_576, @@ -201,6 +206,8 @@ defmodule Realtime.Tenants.ConnectTest do ] }) + tenant = Containers.initialize(tenant) + Enum.each(1..10, fn _ -> Task.start(fn -> Connect.lookup_or_start_connection(tenant.external_id) @@ -215,7 +222,7 @@ defmodule Realtime.Tenants.ConnectTest do test "on migrations failure, stop the process", %{tenant: tenant} do with_mock Realtime.Tenants.Migrations, [], run_migrations: fn _ -> raise("error") end do assert {:ok, pid} = Connect.lookup_or_start_connection(tenant.external_id) - Process.sleep(200) + Process.sleep(1000) refute Process.alive?(pid) assert_called(Realtime.Tenants.Migrations.run_migrations(tenant)) end @@ -276,9 +283,9 @@ defmodule Realtime.Tenants.ConnectTest do assert Process.alive?(old_replication_connection_pid) assert Process.alive?(old_listen_connection_pid) - System.cmd("docker", ["stop", "tenant-db"]) + System.cmd("docker", ["stop", "realtime-test-#{tenant.external_id}"]) Process.sleep(500) - System.cmd("docker", ["start", "tenant-db"]) + System.cmd("docker", ["start", "realtime-test-#{tenant.external_id}"]) Process.sleep(3000) refute Process.alive?(old_pid) @@ -289,12 +296,16 @@ defmodule Realtime.Tenants.ConnectTest do assert Listen.whereis(tenant.external_id) == nil end - test "handles max_wal_senders by logging the correct operational code", %{tenant: tenant} do - opts = Database.from_tenant(tenant, "realtime_test", :stop) |> Database.opts() + test "handles max_wal_senders by logging the correct operational code" do + tenant = tenant_fixture() + tenant = Containers.initialize(tenant, true, true) + on_exit(fn -> Containers.stop_container(tenant) end) + + opts = tenant |> Database.from_tenant("realtime_test", :stop) |> Database.opts() # This creates a loop of errors that occupies all WAL senders and lets us test the error handling pids = - for i <- 0..4 do + for i <- 0..10 do replication_slot_opts = %PostgresReplication{ connection_opts: opts, @@ -311,7 +322,7 @@ defmodule Realtime.Tenants.ConnectTest do end on_exit(fn -> - Enum.each(pids, &Process.exit(&1, :kill)) + Enum.each(pids, &Process.exit(&1, :normal)) Process.sleep(2000) end) @@ -345,13 +356,13 @@ defmodule Realtime.Tenants.ConnectTest do test "shutdowns all associated connections", %{tenant: tenant} do assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id) assert Process.alive?(db_conn) - Process.sleep(300) + Process.sleep(1000) assert Process.alive?(Connect.whereis(tenant.external_id)) assert Process.alive?(ReplicationConnection.whereis(tenant.external_id)) assert Process.alive?(Listen.whereis(tenant.external_id)) Connect.shutdown(tenant.external_id) - Process.sleep(200) + Process.sleep(1000) refute Connect.whereis(tenant.external_id) refute ReplicationConnection.whereis(tenant.external_id) refute Listen.whereis(tenant.external_id) @@ -362,15 +373,17 @@ defmodule Realtime.Tenants.ConnectTest do end test "tenant not able to connect if database has not enough connections" do + port = Enum.random(5500..9000) + extensions = [ %{ "type" => "postgres_cdc_rls", "settings" => %{ - "db_host" => "localhost", + "db_host" => "127.0.0.1", "db_name" => "postgres", "db_user" => "supabase_admin", "db_password" => "postgres", - "db_port" => "5433", + "db_port" => "#{port}", "poll_interval" => 100, "poll_max_changes" => 100, "poll_max_record_bytes" => 1_048_576, @@ -384,6 +397,7 @@ defmodule Realtime.Tenants.ConnectTest do ] tenant = tenant_fixture(%{extensions: extensions}) + tenant = Containers.initialize(tenant) assert {:error, :tenant_db_too_many_connections} = Connect.lookup_or_start_connection(tenant.external_id) diff --git a/test/realtime/tenants/janitor_test.exs b/test/realtime/tenants/janitor_test.exs index d79c96c2d..b9a2bad78 100644 --- a/test/realtime/tenants/janitor_test.exs +++ b/test/realtime/tenants/janitor_test.exs @@ -1,12 +1,11 @@ defmodule Realtime.Tenants.JanitorTest do - # async: false due to using database process + # async: false due to the fact that we're checking ets tables that can be modified by other tests and we are using mocks use Realtime.DataCase, async: false import Mock import ExUnit.CaptureLog alias Realtime.Api.Message - alias Realtime.Api.Tenant alias Realtime.Database alias Realtime.Repo alias Realtime.Tenants.Janitor @@ -15,7 +14,6 @@ defmodule Realtime.Tenants.JanitorTest do setup do :ets.delete_all_objects(Connect) - dev_tenant = Tenant |> Repo.all() |> hd() timer = Application.get_env(:realtime, :janitor_schedule_timer) Application.put_env(:realtime, :janitor_schedule_timer, 200) @@ -24,16 +22,14 @@ defmodule Realtime.Tenants.JanitorTest do tenants = Enum.map( - [ - tenant_fixture(), - dev_tenant - ], + [tenant_fixture(), tenant_fixture()], fn tenant -> + Containers.initialize(tenant, true, true) + on_exit(fn -> Containers.stop_container(tenant) end) + tenant = Repo.preload(tenant, :extensions) Connect.lookup_or_start_connection(tenant.external_id) - Process.sleep(250) - {:ok, conn} = Database.connect(tenant, "realtime_test", :stop) - clean_table(conn, "realtime", "messages") + Process.sleep(500) tenant end ) @@ -134,15 +130,17 @@ defmodule Realtime.Tenants.JanitorTest do end test "logs error if fails to connect to tenant" do + port = Enum.random(5500..8000) + extensions = [ %{ "type" => "postgres_cdc_rls", "settings" => %{ - "db_host" => "localhost", + "db_host" => "127.0.0.1", "db_name" => "postgres", "db_user" => "supabase_admin", - "db_password" => "bad", - "db_port" => "5433", + "db_password" => "postgres", + "db_port" => "#{port}", "poll_interval" => 100, "poll_max_changes" => 100, "poll_max_record_bytes" => 1_048_576, diff --git a/test/realtime/tenants/listen_test.exs b/test/realtime/tenants/listen_test.exs index 3b6667dcf..682d38337 100644 --- a/test/realtime/tenants/listen_test.exs +++ b/test/realtime/tenants/listen_test.exs @@ -1,72 +1,55 @@ defmodule Realtime.Tenants.ListenTest do - # async: false due to the fact that it's doing Postgres NOTIFY and could interfere with other tests - use Realtime.DataCase, async: false + use Realtime.DataCase, async: true import ExUnit.CaptureLog - import Mock - - alias Realtime.GenCounter - alias Realtime.RateCounter alias Realtime.Tenants.Listen alias Realtime.Tenants.Migrations alias Realtime.Database - alias RealtimeWeb.Endpoint describe("start/1") do setup do start_supervised(RealtimeWeb.Joken.CurrentTime.Mock) - start_supervised(Realtime.RateCounter.DynamicSupervisor) - start_supervised(Realtime.GenCounter.DynamicSupervisor) + start_supervised(Realtime.RateCounter) + start_supervised(Realtime.GenCounter) - tenant = tenant_fixture() - RateCounter.new({:channel, :events, tenant.external_id}) + tenant = Containers.checkout_tenant(true) + on_exit(fn -> Containers.checkin_tenant(tenant) end) - {:ok, listen_conn} = Listen.start(tenant, self()) + {:ok, _} = Listen.start(tenant, self()) {:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop) Migrations.run_migrations(tenant) - on_exit(fn -> - Process.exit(listen_conn, :normal) - Process.exit(db_conn, :normal) - end) - {:ok, tenant: tenant, db_conn: db_conn} end test "on realtime.send error, notify will capture and log error", %{db_conn: db_conn} do - with_mocks [ - {Endpoint, [:passthrough], broadcast_from: fn _, _, _, _ -> :ok end}, - {GenCounter, [:passthrough], add: fn _ -> :ok end}, - {RateCounter, [:passthrough], get: fn _ -> {:ok, %{avg: 0}} end} - ] do - assert capture_log(fn -> - Postgrex.query!( - db_conn, - """ - DO $$ - BEGIN - INSERT INTO realtime.messages (payload, event, topic, private, extension, inserted_at) VALUES (null, 'event', 'topic', false, 'broadcast', NOW() - INTERVAL '10 days'); - EXCEPTION - WHEN OTHERS THEN - PERFORM pg_notify( - 'realtime:system', - jsonb_build_object('error', SQLERRM , 'function', 'realtime.send', 'event', 'event', 'topic', 'topic', 'private', false )::text - ); - END - $$; - """, - [] - ) - - Process.sleep(100) - end) =~ "FailedSendFromDatabase" - end + assert capture_log(fn -> + Postgrex.query!( + db_conn, + """ + DO $$ + BEGIN + INSERT INTO realtime.messages (payload, event, topic, private, extension, inserted_at) VALUES (null, 'event', 'topic', false, 'broadcast', NOW() - INTERVAL '10 days'); + EXCEPTION + WHEN OTHERS THEN + PERFORM pg_notify( + 'realtime:system', + jsonb_build_object('error', SQLERRM , 'function', 'realtime.send', 'event', 'event', 'topic', 'topic', 'private', false )::text + ); + END + $$; + """, + [] + ) + + Process.sleep(100) + end) =~ "FailedSendFromDatabase" end end describe "whereis/1" do test "returns pid if exists" do - tenant = tenant_fixture() + tenant = Containers.checkout_tenant() Listen.start(tenant, self()) assert Listen.whereis(tenant.external_id) end diff --git a/test/realtime/tenants/migrations_test.exs b/test/realtime/tenants/migrations_test.exs index 53fd8012e..6f62bae1d 100644 --- a/test/realtime/tenants/migrations_test.exs +++ b/test/realtime/tenants/migrations_test.exs @@ -1,27 +1,19 @@ defmodule Realtime.Tenants.MigrationsTest do - # async: false due to the fact that we're dropping database migrations - use Realtime.DataCase, async: false + use Realtime.DataCase, async: true - alias Realtime.Database alias Realtime.Tenants.Migrations describe "run_migrations/1" do setup do - tenant = tenant_fixture() + tenant = Containers.checkout_tenant() + on_exit(fn -> Containers.checkin_tenant(tenant) end) %{tenant: tenant} end test "migrations for a given tenant only run once", %{tenant: tenant} do - {:ok, conn} = Database.connect(tenant, "realtime_test", :stop) - - Postgrex.query!(conn, "DROP SCHEMA realtime CASCADE", []) - Postgrex.query!(conn, "CREATE SCHEMA realtime", []) - res = for _ <- 0..10 do - Task.async(fn -> - Migrations.run_migrations(tenant) - end) + Task.async(fn -> Migrations.run_migrations(tenant) end) end |> Task.await_many() |> Enum.uniq() diff --git a/test/realtime/tenants/replication_connection_test.exs b/test/realtime/tenants/replication_connection_test.exs index da3759497..c1901a20a 100644 --- a/test/realtime/tenants/replication_connection_test.exs +++ b/test/realtime/tenants/replication_connection_test.exs @@ -1,5 +1,5 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do - # async: false due to the fact that we're using the database to intercept messages created which will interfer with other tests + # async: false due to the usage of mocks use Realtime.DataCase, async: false import ExUnit.CaptureLog @@ -9,51 +9,40 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do alias Realtime.Tenants.ReplicationConnection alias Realtime.Database alias Realtime.Tenants.BatchBroadcast - alias Realtime.Tenants.Migrations setup do - Cleanup.ensure_no_replication_slot() slot = Application.get_env(:realtime, :slot_name_suffix) Application.put_env(:realtime, :slot_name_suffix, "test") - start_supervised(Realtime.Tenants.CacheSupervisor) - tenant = tenant_fixture() - Migrations.run_migrations(tenant) + tenant = Containers.checkout_tenant(true) - {:ok, conn} = Database.connect(tenant, "realtime_test", :stop) - clean_table(conn, "realtime", "messages") - - publication = - ReplicationConnection.publication_name(%ReplicationConnection{ - tenant_id: tenant.external_id, - schema: "realtime", - table: "messages" - }) - - Postgrex.query(conn, "DROP PUBLICATION #{publication}", []) - - on_exit(fn -> Application.put_env(:realtime, :slot_name_suffix, slot) end) + on_exit(fn -> + Application.put_env(:realtime, :slot_name_suffix, slot) + Containers.checkin_tenant(tenant) + end) %{tenant: tenant} end test "fails if tenant connection is invalid" do + port = Enum.random(5500..9000) + tenant = tenant_fixture(%{ "extensions" => [ %{ "type" => "postgres_cdc_rls", "settings" => %{ - "db_host" => "localhost", + "db_host" => "127.0.0.1", "db_name" => "postgres", "db_user" => "supabase_admin", - "db_password" => "bad", - "db_port" => "5433", + "db_password" => "postgres", + "db_port" => "#{port}", "poll_interval" => 100, "poll_max_changes" => 100, "poll_max_record_bytes" => 1_048_576, "region" => "us-east-1", - "ssl_enforced" => false + "ssl_enforced" => true } } ] @@ -64,31 +53,14 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do end) =~ "UnableToStartHandler" end - test_with_mock "starts a handler for the tenant and broadcasts for single insert", - BatchBroadcast, - broadcast: fn _, _, _, _ -> :ok end do - tenant = tenant_fixture() - - {:ok, _pid} = ReplicationConnection.start(tenant, self()) - - total_messages = 5 - # Works with one insert per transaction - for _ <- 1..total_messages do - message_fixture(tenant, %{ - "topic" => random_string(), - "private" => true, - "event" => "INSERT", - "payload" => %{"value" => random_string()} - }) - end + test "starts a handler for the tenant and broadcasts for single insert", %{tenant: tenant} do + with_mock BatchBroadcast, broadcast: fn _, _, _, _ -> :ok end do + {:ok, _pid} = ReplicationConnection.start(tenant, self()) - Process.sleep(500) - - assert_called_exactly(BatchBroadcast.broadcast(nil, tenant, :_, :_), total_messages) - # Works with batch inserts - messages = + total_messages = 5 + # Works with one insert per transaction for _ <- 1..total_messages do - Message.changeset(%Message{}, %{ + message_fixture(tenant, %{ "topic" => random_string(), "private" => true, "event" => "INSERT", @@ -96,11 +68,26 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do }) end - Database.connect(tenant, "realtime_test", :stop) - Realtime.Repo.insert_all_entries(Message, messages, Message) - Process.sleep(500) + Process.sleep(500) + + assert_called_exactly(BatchBroadcast.broadcast(nil, tenant, :_, :_), total_messages) + # Works with batch inserts + messages = + for _ <- 1..total_messages do + Message.changeset(%Message{}, %{ + "topic" => random_string(), + "private" => true, + "event" => "INSERT", + "payload" => %{"value" => random_string()} + }) + end - assert_called_exactly(BatchBroadcast.broadcast(nil, tenant, :_, :_), total_messages) + Database.connect(tenant, "realtime_test", :stop) + Realtime.Repo.insert_all_entries(Message, messages, Message) + Process.sleep(500) + + assert_called_exactly(BatchBroadcast.broadcast(nil, tenant, :_, :_), total_messages) + end end test "pid is associated to the same pid for a given tenant and guarantees uniqueness", %{tenant: tenant} do @@ -108,15 +95,13 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do assert {:ok, ^pid} = ReplicationConnection.start(tenant, self()) end - test "fails on existing replication slot" do - # Warning: this will only work in testing environments as we are using the same database instance but different tenants so the replication slot will be shared - tenant1 = tenant_fixture() - tenant2 = tenant_fixture() - - assert {:ok, _pid} = ReplicationConnection.start(tenant1, self()) + test "fails on existing replication slot", %{tenant: tenant} do + {:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop) + name = "supabase_realtime_messages_replication_slot_test" + Postgrex.query!(db_conn, "SELECT pg_create_logical_replication_slot($1, 'test_decoding')", [name]) assert {:error, "Temporary Replication slot already exists and in use"} = - ReplicationConnection.start(tenant2, self()) + ReplicationConnection.start(tenant, self()) end defmodule TestHandler do @@ -176,8 +161,7 @@ defmodule Realtime.Tenants.ReplicationConnectionTest do end describe "whereis/1" do - test "returns pid if exists" do - tenant = tenant_fixture() + test "returns pid if exists", %{tenant: tenant} do assert {:ok, pid} = ReplicationConnection.start(tenant, self()) assert ReplicationConnection.whereis(tenant.external_id) == pid end diff --git a/test/realtime/tenants_test.exs b/test/realtime/tenants_test.exs index 2b921c6c4..96840e9dc 100644 --- a/test/realtime/tenants_test.exs +++ b/test/realtime/tenants_test.exs @@ -1,7 +1,6 @@ defmodule Realtime.TenantsTest do - use Realtime.DataCase - - import Mock + # async: false due to cache usage + use Realtime.DataCase, async: false alias Realtime.GenCounter alias Realtime.Tenants @@ -10,33 +9,35 @@ defmodule Realtime.TenantsTest do describe "tenants" do test "get_tenant_limits/1" do tenant = tenant_fixture() + start_supervised(GenCounter) + keys = Tenants.limiter_keys(tenant) + + for key <- keys do + GenCounter.new(key) + GenCounter.add(key, 9) + end - with_mocks([ - {GenCounter, [], [get: fn _ -> {:ok, 9} end]} - ]) do - keys = Tenants.limiter_keys(tenant) - limits = Tenants.get_tenant_limits(tenant, keys) + limits = Tenants.get_tenant_limits(tenant, keys) - [all] = - Enum.filter(limits, fn e -> e.limiter == Tenants.requests_per_second_key(tenant) end) + [all] = + Enum.filter(limits, fn e -> e.limiter == Tenants.requests_per_second_key(tenant) end) - assert all.counter == 9 + assert all.counter == 9 - [user_channels] = - Enum.filter(limits, fn e -> e.limiter == Tenants.channels_per_client_key(tenant) end) + [user_channels] = + Enum.filter(limits, fn e -> e.limiter == Tenants.channels_per_client_key(tenant) end) - assert user_channels.counter == 9 + assert user_channels.counter == 9 - [channel_joins] = - Enum.filter(limits, fn e -> e.limiter == Tenants.joins_per_second_key(tenant) end) + [channel_joins] = + Enum.filter(limits, fn e -> e.limiter == Tenants.joins_per_second_key(tenant) end) - assert channel_joins.counter == 9 + assert channel_joins.counter == 9 - [tenant_events] = - Enum.filter(limits, fn e -> e.limiter == Tenants.events_per_second_key(tenant) end) + [tenant_events] = + Enum.filter(limits, fn e -> e.limiter == Tenants.events_per_second_key(tenant) end) - assert tenant_events.counter == 9 - end + assert tenant_events.counter == 9 end end diff --git a/test/realtime_web/channels/auth/channels_authorization_test.exs b/test/realtime_web/channels/auth/channels_authorization_test.exs index 3e27c6b11..b2cdaaad3 100644 --- a/test/realtime_web/channels/auth/channels_authorization_test.exs +++ b/test/realtime_web/channels/auth/channels_authorization_test.exs @@ -1,9 +1,12 @@ defmodule RealtimeWeb.ChannelsAuthorizationTest do - use ExUnit.Case + # async: false due to usage of mocks + use ExUnit.Case, async: false + import Mock import Generators - alias RealtimeWeb.{ChannelsAuthorization, JwtVerification} + alias RealtimeWeb.ChannelsAuthorization + alias RealtimeWeb.JwtVerification @secret "" describe "authorize_conn/3" do diff --git a/test/realtime_web/channels/auth/jwt_verification_test.exs b/test/realtime_web/channels/auth/jwt_verification_test.exs index 92cb344d8..171ac6fdf 100644 --- a/test/realtime_web/channels/auth/jwt_verification_test.exs +++ b/test/realtime_web/channels/auth/jwt_verification_test.exs @@ -1,5 +1,6 @@ defmodule RealtimeWeb.JwtVerificationTest do - use ExUnit.Case, async: false + # async: false due to mock usage + use Realtime.DataCase, async: false alias RealtimeWeb.JwtVerification alias RealtimeWeb.Joken.CurrentTime.Mock @@ -13,7 +14,7 @@ defmodule RealtimeWeb.JwtVerificationTest do end setup do - {:ok, _pid} = start_supervised(Mock) + start_supervised(Mock) :ok end diff --git a/test/realtime_web/channels/realtime_channel/broadcast_handler_test.exs b/test/realtime_web/channels/realtime_channel/broadcast_handler_test.exs index b9ca191a0..03cdc346e 100644 --- a/test/realtime_web/channels/realtime_channel/broadcast_handler_test.exs +++ b/test/realtime_web/channels/realtime_channel/broadcast_handler_test.exs @@ -1,6 +1,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do - # async: false as we are using the database to test RLS policies + # async: false due to the usage of mocks use Realtime.DataCase, async: false + import Generators import Mock @@ -17,7 +18,6 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do alias RealtimeWeb.RealtimeChannel.BroadcastHandler setup [:initiate_tenant] - setup %{topic: topic}, do: Endpoint.subscribe("realtime:#{topic}") describe "call/2" do test "with write true policy, user is able to send message", %{topic: topic, tenant: tenant, db_conn: db_conn} do @@ -162,7 +162,7 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do socket end - Process.sleep(1000) + Process.sleep(1100) {:ok, %{avg: avg}} = RateCounter.get(Tenants.events_per_second_key(tenant)) assert avg > 0.0 end @@ -185,19 +185,27 @@ defmodule RealtimeWeb.RealtimeChannel.BroadcastHandlerTest do end defp initiate_tenant(context) do - start_supervised(Realtime.GenCounter) - start_supervised(Realtime.RateCounter) + start_supervised(Realtime.GenCounter.DynamicSupervisor) + start_supervised(Realtime.RateCounter.DynamicSupervisor) start_supervised(CurrentTime.Mock) - tenant = tenant_fixture() + tenant = Containers.checkout_tenant(true) + RateCounter.stop(tenant.external_id) + GenCounter.stop(tenant.external_id) + RateCounter.new(tenant.external_id) + GenCounter.new(tenant.external_id) + + on_exit(fn -> + Containers.checkin_tenant(tenant) + end) + {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id) - topic = random_string() + Process.sleep(500) - if policies = context[:policies] do - create_rls_policies(db_conn, policies, %{topic: topic}) - end + topic = random_string() + Endpoint.subscribe("realtime:#{topic}") + if policies = context[:policies], do: create_rls_policies(db_conn, policies, %{topic: topic}) - on_exit(fn -> Connect.shutdown(tenant.external_id) end) {:ok, tenant: tenant, db_conn: db_conn, topic: topic} end diff --git a/test/realtime_web/channels/realtime_channel/presence_handler_test.exs b/test/realtime_web/channels/realtime_channel/presence_handler_test.exs index aaaa07f94..f637bb215 100644 --- a/test/realtime_web/channels/realtime_channel/presence_handler_test.exs +++ b/test/realtime_web/channels/realtime_channel/presence_handler_test.exs @@ -1,6 +1,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do - # async: false as we are using the database to test RLS policies + # async: false due to the usage of mocks use Realtime.DataCase, async: false + import Generators import Mock @@ -19,7 +20,6 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do alias RealtimeWeb.RealtimeChannel.PresenceHandler setup [:initiate_tenant] - setup %{topic: topic}, do: Endpoint.subscribe("realtime:#{topic}") describe "handle/2" do test "with true policy and is private, user can track their presence and changes", %{ @@ -146,19 +146,27 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do end defp initiate_tenant(context) do - start_supervised(Realtime.GenCounter) - start_supervised(Realtime.RateCounter) + start_supervised(Realtime.GenCounter.DynamicSupervisor) + start_supervised(Realtime.RateCounter.DynamicSupervisor) start_supervised(CurrentTime.Mock) - tenant = tenant_fixture() + tenant = Containers.checkout_tenant(true) + RateCounter.stop(tenant.external_id) + GenCounter.stop(tenant.external_id) + RateCounter.new(tenant.external_id) + GenCounter.new(tenant.external_id) + + on_exit(fn -> + Containers.checkin_tenant(tenant) + end) + {:ok, db_conn} = Connect.lookup_or_start_connection(tenant.external_id) - topic = random_string() + Process.sleep(500) - if policies = context[:policies] do - create_rls_policies(db_conn, policies, %{topic: topic}) - end + topic = random_string() + Endpoint.subscribe("realtime:#{topic}") + if policies = context[:policies], do: create_rls_policies(db_conn, policies, %{topic: topic}) - on_exit(fn -> Connect.shutdown(tenant.external_id) end) {:ok, tenant: tenant, db_conn: db_conn, topic: topic} end diff --git a/test/realtime_web/channels/realtime_channel_test.exs b/test/realtime_web/channels/realtime_channel_test.exs index 32c3adc24..8435ef00e 100644 --- a/test/realtime_web/channels/realtime_channel_test.exs +++ b/test/realtime_web/channels/realtime_channel_test.exs @@ -1,4 +1,5 @@ defmodule RealtimeWeb.RealtimeChannelTest do + # async: false due to usage of mocks use ExUnit.Case, async: false use RealtimeWeb.ChannelCase @@ -189,6 +190,8 @@ defmodule RealtimeWeb.RealtimeChannelTest do end test "unsuccessful connection halts join" do + port = Enum.random(5500..9000) + extensions = [ %{ "type" => "postgres_cdc_rls", @@ -197,7 +200,7 @@ defmodule RealtimeWeb.RealtimeChannelTest do "db_name" => "false", "db_user" => "false", "db_password" => "false", - "db_port" => "5433", + "db_port" => "#{port}", "poll_interval" => 100, "poll_max_changes" => 100, "poll_max_record_bytes" => 1_048_576, @@ -208,24 +211,24 @@ defmodule RealtimeWeb.RealtimeChannelTest do ] tenant = tenant_fixture(%{extensions: extensions}) - - {:ok, %Socket{} = socket} = - connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant.external_id)) + {:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant.external_id)) assert {:error, %{reason: "Realtime was unable to connect to the project database"}} = subscribe_and_join(socket, "realtime:test", %{}) end test "lack of connections halts join" do + port = Enum.random(5500..9000) + extensions = [ %{ "type" => "postgres_cdc_rls", "settings" => %{ - "db_host" => "localhost", + "db_host" => "127.0.0.1", "db_name" => "postgres", "db_user" => "supabase_admin", "db_password" => "postgres", - "db_port" => "5433", + "db_port" => "#{port}", "poll_interval" => 100, "poll_max_changes" => 100, "poll_max_record_bytes" => 1_048_576, @@ -239,6 +242,8 @@ defmodule RealtimeWeb.RealtimeChannelTest do ] tenant = tenant_fixture(%{extensions: extensions}) + tenant = Containers.initialize(tenant, true, true) + on_exit(fn -> Containers.stop_container(tenant) end) {:ok, %Socket{} = socket} = connect(UserSocket, %{"log_level" => "warning"}, conn_opts(tenant.external_id)) diff --git a/test/realtime_web/controllers/broadcast_controller_test.exs b/test/realtime_web/controllers/broadcast_controller_test.exs index a1368b3dc..2eba8c66d 100644 --- a/test/realtime_web/controllers/broadcast_controller_test.exs +++ b/test/realtime_web/controllers/broadcast_controller_test.exs @@ -1,5 +1,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do + # async: false due to usage of mocks use RealtimeWeb.ConnCase, async: false + import Mock alias Realtime.Crypto @@ -12,26 +14,25 @@ defmodule RealtimeWeb.BroadcastControllerTest do @token "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE1MTYyMzkwMjIsInJvbGUiOiJmb28iLCJleHAiOiJiYXIifQ.Ret2CevUozCsPhpgW2FMeFL7RooLgoOvfQzNpLBj5ak" @expired_token "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MjEwNzMyOTAsImlhdCI6MTYyNzg4NjQ0MCwicm9sZSI6ImFub24ifQ.AHmuaydSU3XAxwoIFhd3gwGwjnBIKsjFil0JQEOLtRw" - setup do - Cleanup.ensure_no_replication_slot() - end - describe "broadcast" do - setup %{conn: conn} do - start_supervised(RealtimeWeb.Joken.CurrentTime.Mock) - start_supervised(Realtime.RateCounter.DynamicSupervisor) - start_supervised(Realtime.GenCounter.DynamicSupervisor) - tenant = tenant_fixture() + setup %{conn: conn} do + start_supervised(RealtimeWeb.Joken.CurrentTime.Mock) + start_supervised(Realtime.RateCounter.DynamicSupervisor) + start_supervised(Realtime.GenCounter.DynamicSupervisor) - conn = - conn - |> put_req_header("accept", "application/json") - |> put_req_header("authorization", "Bearer #{@token}") - |> then(&%{&1 | host: "#{tenant.external_id}.supabase.com"}) + tenant = Containers.checkout_tenant(true) + on_exit(fn -> Containers.checkin_tenant(tenant) end) - {:ok, conn: conn, tenant: tenant} - end + conn = + conn + |> put_req_header("accept", "application/json") + |> put_req_header("authorization", "Bearer #{@token}") + |> then(&%{&1 | host: "#{tenant.external_id}.supabase.com"}) + {:ok, conn: conn, tenant: tenant} + end + + describe "broadcast" do test "returns 202 when batch of messages is broadcasted", %{conn: conn, tenant: tenant} do events_key = Tenants.events_per_second_key(tenant) @@ -68,12 +69,11 @@ defmodule RealtimeWeb.BroadcastControllerTest do ) assert_called( - Endpoint.broadcast_from( - :_, - topic_2, - "broadcast", - %{"payload" => payload_2, "event" => event_2, "type" => "broadcast"} - ) + Endpoint.broadcast_from(:_, topic_2, "broadcast", %{ + "payload" => payload_2, + "event" => event_2, + "type" => "broadcast" + }) ) assert_called_exactly(GenCounter.add(events_key), 3) @@ -129,51 +129,12 @@ defmodule RealtimeWeb.BroadcastControllerTest do end describe "too many requests" do - setup %{conn: conn} do - start_supervised(RealtimeWeb.Joken.CurrentTime.Mock) - start_supervised(Realtime.RateCounter.DynamicSupervisor) - start_supervised(Realtime.GenCounter.DynamicSupervisor) - - tenant = tenant_fixture(%{max_events_per_second: 1}) - GenCounter.new(Tenants.events_per_second_key(tenant.external_id)) - - conn = - conn - |> put_req_header("accept", "application/json") - |> put_req_header("authorization", "Bearer #{@token}") - |> then(&%{&1 | host: "#{tenant.external_id}.supabase.com"}) - - {:ok, conn: conn, tenant: tenant} - end - test "batch will exceed rate limit", %{conn: conn, tenant: tenant} do - conn = - post(conn, Routes.broadcast_path(conn, :broadcast), %{ - "messages" => - Stream.repeatedly(fn -> - %{ - "topic" => Tenants.tenant_topic(tenant, "sub_topic"), - "payload" => %{"data" => "data"}, - "event" => "event" - } - end) - |> Enum.take(10) - }) - - assert conn.status == 429 - - assert conn.resp_body == - Jason.encode!(%{ - message: "Too many messages to broadcast, please reduce the batch size" - }) - end - - test "user has hit the rate limit", %{conn: conn, tenant: tenant} do - events_key = Tenants.events_per_second_key(tenant) requests_key = Tenants.requests_per_second_key(tenant) + events_key = Tenants.events_per_second_key(tenant) with_mocks [ - {RateCounter, [], new: fn _, _ -> :ok end}, + {RateCounter, [], new: fn _, _ -> {:ok, nil} end}, {RateCounter, [], get: fn ^requests_key -> {:ok, %RateCounter{avg: 0}} @@ -182,47 +143,67 @@ defmodule RealtimeWeb.BroadcastControllerTest do ] do conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{ - "messages" => [ - %{ - "topic" => Tenants.tenant_topic(tenant, "sub_topic"), - "payload" => %{"data" => "data"}, - "event" => "event" - } - ] + "messages" => + Stream.repeatedly(fn -> + %{ + "topic" => Tenants.tenant_topic(tenant, "sub_topic"), + "payload" => %{"data" => "data"}, + "event" => "event" + } + end) + |> Enum.take(1000) }) assert conn.status == 429 assert conn.resp_body == Jason.encode!(%{ - message: "You have exceeded your rate limit" + message: "Too many messages to broadcast, please reduce the batch size" }) end end + + test "user has hit the rate limit", %{conn: conn, tenant: tenant} do + requests_key = Tenants.requests_per_second_key(tenant) + events_key = Tenants.events_per_second_key(tenant) + + with_mocks [ + {RateCounter, [], new: fn _, _ -> {:ok, nil} end}, + {RateCounter, [], + get: fn + ^requests_key -> {:ok, %RateCounter{avg: 0}} + ^events_key -> {:ok, %RateCounter{avg: 1000}} + end} + ] do + messages = [ + %{"topic" => Tenants.tenant_topic(tenant, "sub_topic"), "payload" => %{"data" => "data"}, "event" => "event"} + ] + + conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages}) + assert conn.status == 429 + assert conn.resp_body == Jason.encode!(%{message: "You have exceeded your rate limit"}) + end + end end describe "unauthorized" do test "invalid token returns 401", %{conn: conn} do - tenant = tenant_fixture() - conn = conn |> put_req_header("accept", "application/json") |> put_req_header("x-api-key", "potato") - |> then(&%{&1 | host: "#{tenant.external_id}.supabase.com"}) + |> then(&%{&1 | host: "dev_tenant.supabase.com"}) conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{}) assert conn.status == 401 end test "expired token returns 401", %{conn: conn} do - tenant = tenant_fixture() - conn = conn |> put_req_header("accept", "application/json") |> put_req_header("x-api-key", @expired_token) - |> then(&%{&1 | host: "#{tenant.external_id}.supabase.com"}) + |> then(&%{&1 | host: "dev_tenant.supabase.com"}) conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{}) assert conn.status == 401 @@ -246,7 +227,9 @@ defmodule RealtimeWeb.BroadcastControllerTest do start_supervised(Realtime.GenCounter.DynamicSupervisor) start_supervised(RealtimeWeb.Joken.CurrentTime.Mock) - tenant = tenant_fixture() + tenant = Containers.checkout_tenant(true) + on_exit(fn -> Containers.checkin_tenant(tenant) end) + jwt_secret = Crypto.decrypt!(tenant.jwt_secret) {:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop) @@ -332,13 +315,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do messages = messages ++ - [ - %{ - "topic" => "open_channel", - "payload" => %{"content" => random_string()}, - "event" => random_string() - } - ] + [%{"topic" => "open_channel", "payload" => %{"content" => random_string()}, "event" => random_string()}] conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages}) @@ -348,19 +325,8 @@ defmodule RealtimeWeb.BroadcastControllerTest do end) # Check open channel - assert_called( - Endpoint.broadcast_from( - :_, - Tenants.tenant_topic(tenant, "open_channel"), - "broadcast", - :_ - ) - ) - - assert_called_exactly( - GenCounter.add(Tenants.events_per_second_key(tenant)), - length(channels) + 1 - ) + assert_called(Endpoint.broadcast_from(:_, Tenants.tenant_topic(tenant, "open_channel"), "broadcast", :_)) + assert_called_exactly(GenCounter.add(Tenants.events_per_second_key(tenant)), length(channels) + 1) assert conn.status == 202 end @@ -400,18 +366,10 @@ defmodule RealtimeWeb.BroadcastControllerTest do end) assert_not_called( - Endpoint.broadcast_from( - :_, - Tenants.tenant_topic(tenant, no_auth_channel.topic, false), - "broadcast", - :_ - ) + Endpoint.broadcast_from(:_, Tenants.tenant_topic(tenant, no_auth_channel.topic, false), "broadcast", :_) ) - assert_called_exactly( - GenCounter.add(Tenants.events_per_second_key(tenant)), - length(messages_to_send) - ) + assert_called_exactly(GenCounter.add(Tenants.events_per_second_key(tenant)), length(messages_to_send)) assert conn.status == 202 end @@ -460,13 +418,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do defp generate_message_with_policies(db_conn, tenant) do message = message_fixture(tenant) - - create_rls_policies( - db_conn, - [:authenticated_read_broadcast, :authenticated_write_broadcast], - message - ) - + create_rls_policies(db_conn, [:authenticated_read_broadcast, :authenticated_write_broadcast], message) message end end diff --git a/test/realtime_web/controllers/tenant_controller_test.exs b/test/realtime_web/controllers/tenant_controller_test.exs index 92cc80713..cd409b743 100644 --- a/test/realtime_web/controllers/tenant_controller_test.exs +++ b/test/realtime_web/controllers/tenant_controller_test.exs @@ -1,10 +1,9 @@ defmodule RealtimeWeb.TenantControllerTest do - # async: false required due to the delete tests that connects to the database directly and might interfere with other tests + # async: false due to the usage of mocks use RealtimeWeb.ConnCase, async: false import Mock - alias Ecto.Adapters.SQL.Sandbox alias Realtime.Api.Tenant alias Realtime.Crypto alias Realtime.Database @@ -27,7 +26,7 @@ defmodule RealtimeWeb.TenantControllerTest do @default_tenant_attrs %{ "external_id" => "external_id", - "name" => "localhost", + "name" => "external_id", "extensions" => [ %{ "type" => "postgres_cdc_rls", @@ -51,10 +50,6 @@ defmodule RealtimeWeb.TenantControllerTest do @invalid_attrs %{external_id: nil, jwt_secret: nil, extensions: [], name: nil} setup %{conn: conn} do - # Ensure no replication slot is present before the test - Cleanup.ensure_no_replication_slot() - Sandbox.checkout(Realtime.Repo) - Application.put_env(:realtime, :db_enc_key, "1234567890123456") new_conn = @@ -136,7 +131,14 @@ defmodule RealtimeWeb.TenantControllerTest do end describe "update tenant" do - setup [:create_tenant] + setup do + tenant = tenant_fixture() + tenant = Containers.initialize(tenant, true, true) + {:ok, _} = Realtime.Tenants.Connect.lookup_or_start_connection(tenant.external_id) + Process.sleep(1000) + on_exit(fn -> Containers.stop_container(tenant) end) + %{tenant: tenant} + end test "renders tenant when data is valid", %{ conn: conn, @@ -170,13 +172,13 @@ defmodule RealtimeWeb.TenantControllerTest do end describe "delete tenant" do - setup [:create_tenant] - - setup %{tenant: tenant} do + setup do + tenant = tenant_fixture() + tenant = Containers.initialize(tenant, true, true) {:ok, _} = Realtime.Tenants.Connect.lookup_or_start_connection(tenant.external_id) Process.sleep(1000) - on_exit(fn -> Realtime.Tenants.Connect.shutdown(tenant.external_id) end) - :ok + on_exit(fn -> Containers.stop_container(tenant) end) + %{tenant: tenant} end test "deletes chosen tenant", %{conn: conn, tenant: tenant} do @@ -256,11 +258,12 @@ defmodule RealtimeWeb.TenantControllerTest do test "healthy tenant with 0 client connections", %{ conn: conn, - tenant: %Tenant{external_id: ext_id} + tenant: %Tenant{external_id: external_id} } do with_mock JwtVerification, verify: fn _token, _secret, _jwks -> {:ok, %{}} end do - conn = get(conn, ~p"/api/tenants/#{ext_id}/health") + conn = get(conn, ~p"/api/tenants/#{external_id}/health") data = json_response(conn, 200)["data"] + Connect.shutdown(external_id) assert %{ "healthy" => true, @@ -323,22 +326,16 @@ defmodule RealtimeWeb.TenantControllerTest do end end - test "runs migrations", %{ - conn: conn, - tenant: %Tenant{external_id: ext_id} = tenant - } do + test "runs migrations", %{conn: conn} do with_mock JwtVerification, verify: fn _token, _secret, _jwks -> {:ok, %{}} end do - {:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop) - - Database.transaction(db_conn, fn transaction_conn -> - Postgrex.query!(transaction_conn, "DROP SCHEMA realtime CASCADE", []) - Postgrex.query!(transaction_conn, "CREATE SCHEMA realtime", []) - Postgrex.query!(transaction_conn, "DROP ROLE supabase_realtime_admin", []) - end) + tenant = tenant_fixture() + tenant = Containers.initialize(tenant, true) + on_exit(fn -> Containers.stop_container(tenant) end) + {:ok, db_conn} = Database.connect(tenant, "realtime_test", :stop) assert {:error, _} = Postgrex.query(db_conn, "SELECT * FROM realtime.messages", []) - conn = get(conn, ~p"/api/tenants/#{ext_id}/health") + conn = get(conn, ~p"/api/tenants/#{tenant.external_id}/health") data = json_response(conn, 200)["data"] Process.sleep(2000) @@ -357,6 +354,8 @@ defmodule RealtimeWeb.TenantControllerTest do end defp create_tenant(_) do - %{tenant: tenant_fixture()} + tenant = Containers.checkout_tenant(true) + on_exit(fn -> Containers.checkin_tenant(tenant) end) + %{tenant: tenant} end end diff --git a/test/realtime_web/plugs/assign_tenant_test.exs b/test/realtime_web/plugs/assign_tenant_test.exs index 29934e719..93b8c6ee4 100644 --- a/test/realtime_web/plugs/assign_tenant_test.exs +++ b/test/realtime_web/plugs/assign_tenant_test.exs @@ -4,8 +4,8 @@ defmodule RealtimeWeb.Plugs.AssignTenantTest do alias Realtime.Api @tenant %{ - "external_id" => "localhost", - "name" => "localhost", + "external_id" => "external_id", + "name" => "external_id", "extensions" => [ %{ "type" => "postgres_cdc_rls", @@ -56,6 +56,8 @@ defmodule RealtimeWeb.Plugs.AssignTenantTest do end test "assigns a tenant", %{conn: conn} do + tenant_fixture(%{external_id: "localhost"}) + conn = conn |> Map.put(:host, "localhost.localhost.com") @@ -65,6 +67,8 @@ defmodule RealtimeWeb.Plugs.AssignTenantTest do end test "assigns a tenant even with lots of subdomains", %{conn: conn} do + tenant_fixture(%{external_id: "localhost"}) + conn = conn |> Map.put(:host, "localhost.realtime.localhost.com") diff --git a/test/support/conn_case.ex b/test/support/conn_case.ex index 0a320eec8..fd775aa2d 100644 --- a/test/support/conn_case.ex +++ b/test/support/conn_case.ex @@ -37,9 +37,8 @@ defmodule RealtimeWeb.ConnCase do end setup tags do - :ok = Sandbox.checkout(Realtime.Repo) - - unless tags[:async] do + if !tags[:async] do + :ok = Sandbox.checkout(Realtime.Repo) Sandbox.mode(Realtime.Repo, {:shared, self()}) end diff --git a/test/support/containers.ex b/test/support/containers.ex new file mode 100644 index 000000000..5c662e2a2 --- /dev/null +++ b/test/support/containers.ex @@ -0,0 +1,148 @@ +defmodule Containers do + alias Realtime.Tenants.Connect + alias Realtime.Database + alias Realtime.Tenants.Migrations + + import ExUnit.CaptureLog + defstruct [:port, :tenant, using?: false] + + @type t :: %__MODULE__{port: integer(), tenant: Realtime.Api.Tenant.t(), using?: boolean()} + def initialize(tenant, lock? \\ false, run_migrations? \\ false) do + capture_log(fn -> + if :ets.whereis(:containers) == :undefined, do: :ets.new(:containers, [:named_table, :set, :public]) + + name = "realtime-test-#{tenant.external_id}" + %{port: port} = Database.from_tenant(tenant, "realtime_test", :stop) + + {_, 0} = + System.cmd("docker", [ + "run", + "-d", + "--name", + name, + "-e", + "POSTGRES_HOST=/var/run/postgresql", + "-e", + "POSTGRES_PASSWORD=postgres", + "-p", + "#{port}:5432", + "supabase/postgres:15.8.1.040", + "postgres", + "-c", + "config_file=/etc/postgresql/postgresql.conf" + ]) + + check_container_ready(name) + check_select_possible(tenant) + :ets.insert(:containers, {tenant.external_id, %{tenant: tenant, using?: lock?}}) + end) + + if run_migrations? do + Migrations.run_migrations(tenant) + {:ok, pid} = Database.connect(tenant, "realtime_test", :stop) + Migrations.create_partitions(pid) + end + + tenant + end + + def checkout_tenant(run_migrations? \\ false) do + tenants = :ets.select(:containers, [{{:_, %{using?: :"$1", tenant: :"$2"}}, [{:==, :"$1", false}], [:"$2"]}]) + tenant = Enum.random(tenants) + + :ets.insert(:containers, {tenant.external_id, %{tenant: tenant, using?: true}}) + + if run_migrations? do + Migrations.run_migrations(tenant) + {:ok, pid} = Database.connect(tenant, "realtime_test", :stop) + Migrations.create_partitions(pid) + end + + tenant + end + + def checkin_tenant(tenant) do + settings = Database.from_tenant(tenant, "realtime_test", :stop) + + settings = %{settings | max_restarts: 0, ssl: false} + {:ok, conn} = Database.connect_db(settings) + + Database.transaction(conn, fn db_conn -> + pid = Connect.whereis(tenant.external_id) + if pid && Process.alive?(pid), do: Connect.shutdown(tenant.external_id) + Postgrex.query(db_conn, "DROP SCHEMA realtime CASCADE", []) + Postgrex.query(db_conn, "CREATE SCHEMA realtime", []) + end) + + :ets.insert(:containers, {tenant.external_id, %{tenant: tenant, using?: false}}) + end + + def stop_container(tenant) do + :ets.delete(:containers, tenant.external_id) + pid = Connect.whereis(tenant.external_id) + if pid && Process.alive?(pid), do: Connect.shutdown(tenant.external_id) + name = "realtime-test-#{tenant.external_id}" + System.cmd("docker", ["rm", "-f", name]) + end + + def stop_containers() do + {list, 0} = System.cmd("docker", ["ps", "-a", "--format", "{{.Names}}", "--filter", "name=realtime-test-*"]) + names = list |> String.trim() |> String.split("\n") + + for name <- names do + System.cmd("docker", ["rm", "-f", name]) + end + end + + defp check_container_ready(name, attempts \\ 50) + defp check_container_ready(name, 0), do: raise("Container #{name} is not ready") + + defp check_container_ready(name, attempts) do + case System.cmd("docker", ["exec", name, "pg_isready"]) do + {_, 0} -> + :ok + + {_, _} -> + Process.sleep(500) + check_container_ready(name, attempts - 1) + end + end + + defp check_select_possible(tenant, attempts \\ 100) + defp check_select_possible(_, 0), do: raise("Select is not possible") + + defp check_select_possible(tenant, attempts) do + Process.flag(:trap_exit, true) + + settings = + tenant + |> Realtime.Database.from_tenant("realtime_check", :stop) + |> Map.from_struct() + |> Enum.to_list() + |> Keyword.new() + |> Keyword.put(:max_restarts, 0) + |> Keyword.put(:ssl, false) + |> Keyword.put(:log, false) + + {:ok, db_conn} = Postgrex.start_link(settings) + + case Postgrex.query(db_conn, "SELECT 1", []) do + {:ok, _} -> + :ok + + _ -> + Process.sleep(500) + check_select_possible(tenant, attempts - 1) + end + catch + :exit, _ -> + Process.sleep(500) + check_select_possible(tenant, attempts - 1) + + _ -> + Process.sleep(500) + check_select_possible(tenant, attempts - 1) + after + Process.flag(:trap_exit, false) + end +end diff --git a/test/support/data_case.ex b/test/support/data_case.ex index ee156aee6..91c1da327 100644 --- a/test/support/data_case.ex +++ b/test/support/data_case.ex @@ -29,9 +29,8 @@ defmodule Realtime.DataCase do end setup tags do - :ok = Sandbox.checkout(Realtime.Repo) - - unless tags[:async] do + if !tags[:async] do + :ok = Sandbox.checkout(Realtime.Repo) Sandbox.mode(Realtime.Repo, {:shared, self()}) end diff --git a/test/support/generators.ex b/test/support/generators.ex index 2c4bf6041..b670445ec 100644 --- a/test/support/generators.ex +++ b/test/support/generators.ex @@ -7,22 +7,25 @@ defmodule Generators do @spec tenant_fixture(map()) :: Realtime.Api.Tenant.t() def tenant_fixture(override \\ %{}) do + port = Enum.random(5500..9000) + create_attrs = %{ "external_id" => random_string(), - "name" => "localhost", + "name" => "tenant", "extensions" => [ %{ "type" => "postgres_cdc_rls", "settings" => %{ - "db_host" => "localhost", + "db_host" => "127.0.0.1", "db_name" => "postgres", "db_user" => "supabase_admin", "db_password" => "postgres", - "db_port" => "5433", + "db_port" => "#{port}", "poll_interval" => 100, "poll_max_changes" => 100, "poll_max_record_bytes" => 1_048_576, "region" => "us-east-1", + "publication" => "supabase_realtime_test", "ssl_enforced" => false } } @@ -62,7 +65,7 @@ defmodule Generators do channel end - def random_string(length \\ 10) do + def random_string(length \\ 20) do length |> :crypto.strong_rand_bytes() |> Base.encode32() diff --git a/test/test_helper.exs b/test/test_helper.exs index 0a51ea1ef..72a5083fd 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,2 +1,57 @@ -ExUnit.start(exclude: [:failing], max_cases: 1) -Ecto.Adapters.SQL.Sandbox.mode(Realtime.Repo, :manual) +alias Ecto.Adapters.SQL.Sandbox +alias Realtime.Api +alias Realtime.Database +max_cases = System.get_env("MAX_CASES", "2") |> String.to_integer() +ExUnit.start(exclude: [:failing], max_cases: max_cases) +Containers.stop_containers() + +for tenant <- Api.list_tenants() do + Api.delete_tenant(tenant) +end + +tenant_name = "dev_tenant" +publication = "supabase_realtime_test" +port = Enum.random(5500..9000) +opts = %{external_id: tenant_name, name: tenant_name, port: port, jwt_secret: "secure_jwt_secret"} +tenant = Generators.tenant_fixture(opts) + +# Start dev_realtime container to be used in integration tests +Containers.initialize(tenant, true, true) +{:ok, conn} = Database.connect(tenant, "realtime_seed", :stop) + +Database.transaction(conn, fn db_conn -> + queries = [ + "create sequence if not exists test_id_seq;", + """ + create table "public"."test" ( + "id" int4 not null default nextval('test_id_seq'::regclass), + "details" text, + primary key ("id")); + """, + "grant all on table public.test to anon;", + "grant all on table public.test to postgres;", + "grant all on table public.test to authenticated;", + "create publication #{publication} for all tables" + ] + + Enum.each(queries, &Postgrex.query!(db_conn, &1, [])) +end) + +containers = Application.get_env(:ex_unit, :max_cases, System.schedulers()) * 2 +tenants = for _ <- 0..containers, do: Generators.tenant_fixture() + +# Start other containers to be used based on max test cases +Task.await_many( + for tenant <- tenants do + Task.async(fn -> Containers.initialize(tenant) end) + end, + :infinity +) + +ExUnit.after_suite(fn _ -> + Sandbox.checkout(Realtime.Repo) + + Enum.each(tenants, &Realtime.Api.delete_tenant/1) +end) + +Ecto.Adapters.SQL.Sandbox.mode(Realtime.Repo, :auto) From 77b8d02ffdfc53e565a570833aae2f634d9ff751 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Mon, 24 Feb 2025 22:55:10 +0000 Subject: [PATCH 2/2] rebase main --- lib/realtime/tenants/connect.ex | 10 +++---- mix.exs | 2 +- test/realtime/database_test.exs | 24 ++++++++++++++--- test/realtime/tenants/connect_test.exs | 15 ++++++++--- .../controllers/broadcast_controller_test.exs | 27 +++++++++++++------ 5 files changed, 56 insertions(+), 22 deletions(-) diff --git a/lib/realtime/tenants/connect.ex b/lib/realtime/tenants/connect.ex index d3080be1e..6c8b23f7e 100644 --- a/lib/realtime/tenants/connect.ex +++ b/lib/realtime/tenants/connect.ex @@ -299,11 +299,6 @@ defmodule Realtime.Tenants.Connect do {:stop, :normal, state} end - # Ignore unsuspend messages to avoid handle_info unmatched functions - def handle_info(:unsuspend_tenant, state) do - {:noreply, state} - end - def handle_info( {:DOWN, db_conn_reference, _, _, _}, %{db_conn_reference: db_conn_reference} = state @@ -312,6 +307,11 @@ defmodule Realtime.Tenants.Connect do {:stop, :normal, state} end + # Ignore messages to avoid handle_info unmatched functions + def handle_info(_, state) do + {:noreply, state} + end + @impl true def terminate(reason, %{tenant_id: tenant_id}) do Logger.info("Tenant #{tenant_id} has been terminated: #{inspect(reason)}") diff --git a/mix.exs b/mix.exs index 3058883eb..a857793b3 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.34.33", + version: "2.34.34", elixir: "~> 1.17.3", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/realtime/database_test.exs b/test/realtime/database_test.exs index a483a66ce..adf06c7b4 100644 --- a/test/realtime/database_test.exs +++ b/test/realtime/database_test.exs @@ -10,6 +10,7 @@ defmodule Realtime.DatabaseTest do setup do tenant = Containers.checkout_tenant() + :telemetry.attach(__MODULE__, [:realtime, :database, :transaction], &__MODULE__.handle_telemetry/4, pid: self()) on_exit(fn -> :telemetry.detach(__MODULE__) @@ -62,7 +63,10 @@ defmodule Realtime.DatabaseTest do end describe "replication_slot_teardown/1" do - test "removes replication slots with the realtime prefix", %{tenant: tenant} do + test "removes replication slots with the realtime prefix" do + tenant = tenant_fixture() + tenant = Containers.initialize(tenant, true) + on_exit(fn -> Containers.stop_container(tenant) end) {:ok, conn} = Database.connect(tenant, "realtime_test", :stop) Postgrex.query!( @@ -77,7 +81,11 @@ defmodule Realtime.DatabaseTest do end describe "replication_slot_teardown/2" do - test "removes replication slots with a given name and existing connection", %{tenant: tenant} do + test "removes replication slots with a given name and existing connection" do + tenant = tenant_fixture() + tenant = Containers.initialize(tenant, true) + on_exit(fn -> Containers.stop_container(tenant) end) + name = String.downcase("slot_#{random_string()}") {:ok, conn} = Database.connect(tenant, "realtime_test", :stop) @@ -92,7 +100,11 @@ defmodule Realtime.DatabaseTest do assert %{rows: []} = Postgrex.query!(conn, "SELECT slot_name FROM pg_replication_slots", []) end - test "removes replication slots with a given name and a tenant", %{tenant: tenant} do + test "removes replication slots with a given name and a tenant" do + tenant = tenant_fixture() + tenant = Containers.initialize(tenant, true) + on_exit(fn -> Containers.stop_container(tenant) end) + name = String.downcase("slot_#{random_string()}") {:ok, conn} = Database.connect(tenant, "realtime_test", :stop) @@ -243,7 +255,11 @@ defmodule Realtime.DatabaseTest do end describe "from_settings/3" do - test "returns struct with correct setup", %{tenant: tenant} do + test "returns struct with correct setup" do + tenant = tenant_fixture() + tenant = Containers.initialize(tenant, true) + on_exit(fn -> Containers.stop_container(tenant) end) + application_name = "realtime_connect" backoff = :stop {:ok, ip_version} = Database.detect_ip_version("127.0.0.1") diff --git a/test/realtime/tenants/connect_test.exs b/test/realtime/tenants/connect_test.exs index 22381a962..4c0f3a6fd 100644 --- a/test/realtime/tenants/connect_test.exs +++ b/test/realtime/tenants/connect_test.exs @@ -167,8 +167,14 @@ defmodule Realtime.Tenants.ConnectTest do end test "handles tenant suspension only on targetted suspended user" do - tenant1 = tenant_fixture() - tenant2 = tenant_fixture() + tenant1 = Containers.checkout_tenant(true) + tenant2 = Containers.checkout_tenant(true) + + on_exit(fn -> + Containers.checkin_tenant(tenant1) + Containers.checkin_tenant(tenant2) + end) + assert {:ok, db_conn} = Connect.lookup_or_start_connection(tenant1.external_id) Process.sleep(1000) @@ -206,7 +212,7 @@ defmodule Realtime.Tenants.ConnectTest do ] }) - tenant = Containers.initialize(tenant) + tenant = Containers.initialize(tenant, true) Enum.each(1..10, fn _ -> Task.start(fn -> @@ -397,7 +403,8 @@ defmodule Realtime.Tenants.ConnectTest do ] tenant = tenant_fixture(%{extensions: extensions}) - tenant = Containers.initialize(tenant) + tenant = Containers.initialize(tenant, true) + on_exit(fn -> Containers.stop_container(tenant) end) assert {:error, :tenant_db_too_many_connections} = Connect.lookup_or_start_connection(tenant.external_id) diff --git a/test/realtime_web/controllers/broadcast_controller_test.exs b/test/realtime_web/controllers/broadcast_controller_test.exs index 2eba8c66d..1dfd49e32 100644 --- a/test/realtime_web/controllers/broadcast_controller_test.exs +++ b/test/realtime_web/controllers/broadcast_controller_test.exs @@ -23,11 +23,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do tenant = Containers.checkout_tenant(true) on_exit(fn -> Containers.checkin_tenant(tenant) end) - conn = - conn - |> put_req_header("accept", "application/json") - |> put_req_header("authorization", "Bearer #{@token}") - |> then(&%{&1 | host: "#{tenant.external_id}.supabase.com"}) + conn = generate_conn(conn, tenant) {:ok, conn: conn, tenant: tenant} end @@ -163,7 +159,11 @@ defmodule RealtimeWeb.BroadcastControllerTest do end end - test "user has hit the rate limit", %{conn: conn, tenant: tenant} do + test "user has hit the rate limit", %{conn: conn} do + tenant = tenant_fixture() + tenant = Containers.initialize(tenant, true, true) + on_exit(fn -> Containers.stop_container(tenant) end) + requests_key = Tenants.requests_per_second_key(tenant) events_key = Tenants.events_per_second_key(tenant) @@ -179,6 +179,7 @@ defmodule RealtimeWeb.BroadcastControllerTest do %{"topic" => Tenants.tenant_topic(tenant, "sub_topic"), "payload" => %{"data" => "data"}, "event" => "event"} ] + conn = generate_conn(conn, tenant) conn = post(conn, Routes.broadcast_path(conn, :broadcast), %{"messages" => messages}) assert conn.status == 429 assert conn.resp_body == Jason.encode!(%{message: "You have exceeded your rate limit"}) @@ -378,13 +379,16 @@ defmodule RealtimeWeb.BroadcastControllerTest do @tag role: "anon" test "user without permission won't broadcast", %{ conn: conn, - db_conn: db_conn, - tenant: tenant + db_conn: db_conn } do with_mocks [ {Endpoint, [:passthrough], broadcast_from: fn _, _, _, _ -> :ok end}, {GenCounter, [:passthrough], add: fn _ -> :ok end} ] do + tenant = tenant_fixture() + tenant = Containers.initialize(tenant, true, true) + on_exit(fn -> Containers.stop_container(tenant) end) + messages = Stream.repeatedly(fn -> generate_message_with_policies(db_conn, tenant) end) |> Enum.take(5) @@ -421,4 +425,11 @@ defmodule RealtimeWeb.BroadcastControllerTest do create_rls_policies(db_conn, [:authenticated_read_broadcast, :authenticated_write_broadcast], message) message end + + defp generate_conn(conn, tenant) do + conn + |> put_req_header("accept", "application/json") + |> put_req_header("authorization", "Bearer #{@token}") + |> then(&%{&1 | host: "#{tenant.external_id}.supabase.com"}) + end end