From db43ea7cbdae4c410d244f550653901908e49680 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Filipe=20Caba=C3=A7o?= Date: Sat, 13 Apr 2024 00:56:38 +0100 Subject: [PATCH] add tests --- Makefile | 3 +- lib/realtime/tenants.ex | 5 +- lib/realtime/tenants/cache_pub_sub_handler.ex | 5 + lib/realtime/tenants/connect.ex | 16 +-- lib/realtime_web/channels/realtime_channel.ex | 7 +- .../controllers/tenant_controller.ex | 7 +- lib/realtime_web/router.ex | 1 + mix.exs | 2 +- test/integration/rt_channel_test.exs | 20 ++-- .../controllers/tenant_controller_test.exs | 100 +++++++++++++++++- test/support/generators.ex | 29 +++++ 11 files changed, 157 insertions(+), 38 deletions(-) diff --git a/Makefile b/Makefile index 404615000..deabfb6b2 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,8 @@ PORT ?= 4000 .PHONY: dev dev.orange seed prod bench.% dev_db start start.% stop stop.% rebuild rebuild.% dev: - ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=$(PORT) MIX_ENV=dev SECURE_CHANNELS=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev REGION=fra FLY_ALLOC_ID=123e4567-e89b-12d3-a456-426614174000 DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" iex --name $(NODE_NAME)@127.0.0.1 --cookie cookie -S mix phx.server + ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=$(PORT) MIX_ENV=dev SECURE_Case +S=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev REGION=fra FLY_ALLOC_ID=123e4567-e89b-12d3-a456-426614174000 DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" iex --name $(NODE_NAME)@127.0.0.1 --cookie cookie -S mix phx.server dev.orange: ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=4001 MIX_ENV=dev SECURE_CHANNELS=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev FLY_REGION=fra FLY_ALLOC_ID=123e4567-e89b-12d3-a456-426614174000 DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" iex --name orange@127.0.0.1 --cookie cookie -S mix phx.server diff --git a/lib/realtime/tenants.ex b/lib/realtime/tenants.ex index 33d9fd7a0..9da2c6392 100644 --- a/lib/realtime/tenants.ex +++ b/lib/realtime/tenants.ex @@ -265,7 +265,10 @@ defmodule Realtime.Tenants do """ @spec disconnect_clients(String.t()) :: :ok def disconnect_clients(external_id) do - broadcast_operation_event(:disconnect, external_id) + suspend_tenant_by_external_id(external_id) + :timer.sleep(1000) + unsuspend_tenant_by_external_id(external_id) + :ok end defp broadcast_operation_event(action, external_id) do diff --git a/lib/realtime/tenants/cache_pub_sub_handler.ex b/lib/realtime/tenants/cache_pub_sub_handler.ex index 9a9042b88..f13541485 100644 --- a/lib/realtime/tenants/cache_pub_sub_handler.ex +++ b/lib/realtime/tenants/cache_pub_sub_handler.ex @@ -25,4 +25,9 @@ defmodule Realtime.Tenants.CachePubSubHandler do Cache.invalidate_tenant_cache(tenant_id) {:noreply, state} end + + # Ignore operations + def handle_info({_, _}, state) do + {:noreply, state} + end end diff --git a/lib/realtime/tenants/connect.ex b/lib/realtime/tenants/connect.ex index 6406997f7..349d490fd 100644 --- a/lib/realtime/tenants/connect.ex +++ b/lib/realtime/tenants/connect.ex @@ -147,27 +147,19 @@ defmodule Realtime.Tenants.Connect do end def handle_info( - {:disconnect, tenant_id}, + {:suspend_tenant, tenant_id}, %{db_conn_pid: db_conn_pid, tenant_id: tenant_id} = state ) do - Logger.info("Tenant has requested to disconnect, database connection will be terminated") + Logger.warning("Tenant was suspended, database connection will be terminated") :ok = GenServer.stop(db_conn_pid, :normal, 1000) {:stop, :normal, state} end - def handle_info({:disconnect, _}, state) do + # Ignore suspend_tenant if they are not related with this tenant + def handle_info({:suspend_tenant, _}, state) do {:noreply, state} end - def handle_info( - {:suspend_tenant, tenant_id}, - %{db_conn_pid: db_conn_pid, tenant_id: tenant_id} = state - ) do - Logger.warning("Tenant was suspended, database connection will be terminated") - :ok = GenServer.stop(db_conn_pid, :normal, 1000) - {:stop, :normal, state} - end - # Ignore unsuspend messages to avoid handle_info unmatched functions def handle_info({:unsuspend_tenant, _}, state) do {:noreply, state} diff --git a/lib/realtime_web/channels/realtime_channel.ex b/lib/realtime_web/channels/realtime_channel.ex index 3835ab165..a9a89a5d4 100644 --- a/lib/realtime_web/channels/realtime_channel.ex +++ b/lib/realtime_web/channels/realtime_channel.ex @@ -249,16 +249,11 @@ defmodule RealtimeWeb.RealtimeChannel do end end - def handle_info({:suspend_tenant, tenant_id}, %{tenant_id: tenant_id} = state) do + def handle_info({:suspend_tenant, tenant_id}, %{assigns: %{tenant: tenant_id}} = state) do Logger.info("Tenant suspend, disconencting the socket") {:stop, :normal, state} end - def handle_info({:disconnect, tenant_id}, %{tenant_id: tenant_id} = state) do - Logger.info("Socket disconnection requested by the user") - {:stop, :normal, state} - end - # Ignore operations def handle_info({_, _}, state) do {:noreply, state} diff --git a/lib/realtime_web/controllers/tenant_controller.ex b/lib/realtime_web/controllers/tenant_controller.ex index f15eb7b6e..83a46bd5a 100644 --- a/lib/realtime_web/controllers/tenant_controller.ex +++ b/lib/realtime_web/controllers/tenant_controller.ex @@ -263,12 +263,13 @@ defmodule RealtimeWeb.TenantController do } ) - def disconnect_clients(%{assigns: %{tenant: tenant}} = conn, %{"tenant_id" => tenant_id}) do - Tenants.disconnect_clients(tenant_id) - PostgresCdc.stop_all(tenant, @stop_timeout) + def disconnect_clients(%{assigns: %{tenant: tenant, role: "service_role"}} = conn, _) do + :ok = Realtime.Tenants.disconnect_clients(tenant.external_id) send_resp(conn, 204, "") end + def disconnect_clients(_, _), do: {:error, :unauthorized} + operation(:health, summary: "Tenant health", parameters: [ diff --git a/lib/realtime_web/router.ex b/lib/realtime_web/router.ex index 7dfc01b6f..10318b2c2 100644 --- a/lib/realtime_web/router.ex +++ b/lib/realtime_web/router.ex @@ -104,6 +104,7 @@ defmodule RealtimeWeb.Router do pipe_through([:open_cors, :tenant_api, :secure_tenant_api]) post("/broadcast", BroadcastController, :broadcast) + delete("/disconnect_clients", TenantController, :disconnect_clients) end scope "/api", RealtimeWeb do diff --git a/mix.exs b/mix.exs index e395034fd..18a76a426 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do def project do [ app: :realtime, - version: "2.28.28", + version: "2.28.29", elixir: "~> 1.14.0", elixirc_paths: elixirc_paths(Mix.env()), start_permanent: Mix.env() == :prod, diff --git a/test/integration/rt_channel_test.exs b/test/integration/rt_channel_test.exs index d568bc132..20c178506 100644 --- a/test/integration/rt_channel_test.exs +++ b/test/integration/rt_channel_test.exs @@ -623,20 +623,14 @@ defmodule Realtime.Integration.RtChannelTest do end end - defp token_valid(role), do: generate_token(%{role: role}) - defp token_no_role(), do: generate_token() - - defp generate_token(claims \\ %{}) do - claims = - %{ - ref: "localhost", - iat: System.system_time(:second), - exp: System.system_time(:second) + 604_800 - } - |> Map.merge(claims) + defp token_valid(role) do + {token, _} = generate_token(%{role: role}, @secret) + {:ok, token} + end - signer = Joken.Signer.create("HS256", @secret) - Joken.Signer.sign(claims, signer) + defp token_no_role() do + {token, _} = generate_token(@secret) + {:ok, token} end defp get_connection(role \\ "anon") do diff --git a/test/realtime_web/controllers/tenant_controller_test.exs b/test/realtime_web/controllers/tenant_controller_test.exs index 1e27efa1e..387adf7fb 100644 --- a/test/realtime_web/controllers/tenant_controller_test.exs +++ b/test/realtime_web/controllers/tenant_controller_test.exs @@ -3,6 +3,7 @@ defmodule RealtimeWeb.TenantControllerTest do use RealtimeWeb.ConnCase, async: false import Mock + import Phoenix.ChannelTest import Realtime.Helpers, only: [encrypt!: 2, transaction: 2] alias Realtime.Api.Tenant @@ -15,6 +16,7 @@ defmodule RealtimeWeb.TenantControllerTest do alias RealtimeWeb.ChannelsAuthorization alias RealtimeWeb.JwtVerification + alias RealtimeWeb.Joken.CurrentTime @update_attrs %{ jwt_secret: "some updated jwt_secret", @@ -135,7 +137,7 @@ defmodule RealtimeWeb.TenantControllerTest do setup %{tenant: tenant} do [extension] = tenant.extensions args = Map.put(extension.settings, "id", tenant.external_id) - {:ok, _} = Realtime.PostgresCdc.connect(Extensions.PostgresCdcStream, args) + Realtime.PostgresCdc.connect(Extensions.PostgresCdcStream, args) on_exit(fn -> Realtime.PostgresCdc.stop_all(tenant) end) :ok end @@ -287,7 +289,103 @@ defmodule RealtimeWeb.TenantControllerTest do end end + describe "disconnect clients" do + setup do + start_supervised!(CurrentTime.Mock) + + Application.put_env(:realtime, :db_enc_key, "1234567890123456") + {socket, tenant, jwt_secret} = create_socket() + {socket_2, _tenant_2, _jwt_secret_2} = create_socket() + %{socket: socket, socket_2: socket_2, tenant: tenant, jwt_secret: jwt_secret} + end + + test "disconnects clients when disconnect clients is called and then it's able to reconnect", + %{ + conn: conn, + socket: socket, + socket_2: socket_2, + tenant: tenant, + jwt_secret: jwt_secret + } do + {token, _} = generate_token(%{role: "service_role"}, jwt_secret) + + conn = + conn + |> put_req_header("accept", "application/json") + |> put_req_header("apikey", token) + |> delete_req_header("authorization") + |> then(&%{&1 | host: "#{tenant.external_id}.supabase.com"}) + |> delete(~p"/api/disconnect_clients") + + assert conn.status == 204 + :timer.sleep(1000) + refute socket.channel_pid |> Process.alive?() + assert socket_2.channel_pid |> Process.alive?() + :timer.sleep(1000) + + {socket, _tenant, _jwt_secret} = create_socket(tenant, jwt_secret) + assert socket.channel_pid |> Process.alive?() + end + + test "no action if not service_role token", %{ + conn: conn, + socket: socket, + tenant: tenant, + jwt_secret: jwt_secret + } do + {token, _} = generate_token(%{role: "authenticated"}, jwt_secret) + + conn = + conn + |> put_req_header("accept", "application/json") + |> put_req_header("apikey", token) + |> delete_req_header("authorization") + |> then(&%{&1 | host: "#{tenant.external_id}.supabase.com"}) + |> delete(~p"/api/disconnect_clients") + + assert conn.status == 401 + assert socket.channel_pid |> Process.alive?() + end + end + defp create_tenant(_) do %{tenant: tenant_fixture()} end + + defp create_socket() do + {_, jwt_secret} = generate_token(%{role: "anon"}) + tenant = tenant_fixture(jwt_secret: jwt_secret) + + create_socket(tenant, jwt_secret) + end + + defp create_socket(tenant, jwt_secret) do + secure_key = Application.fetch_env!(:realtime, :db_enc_key) + + {token, _} = generate_token(%{role: "anon"}, jwt_secret) + + assigns = %{ + tenant: tenant.external_id, + log_level: :info, + postgres_cdc_module: Extensions.PostgresCdcRls, + postgres_extension: tenant.extensions |> hd() |> Map.from_struct() |> Map.get(:settings), + tenant_token: token, + access_token: token, + jwt_secret: Helpers.encrypt!(jwt_secret, secure_key), + limits: %{ + max_concurrent_users: 500, + max_events_per_second: 500, + max_bytes_per_second: 1_000_000, + max_channels_per_client: 100, + max_joins_per_second: 500 + } + } + + {:ok, _, socket} = + RealtimeWeb.UserSocket + |> socket("user_id", assigns) + |> subscribe_and_join(RealtimeWeb.RealtimeChannel, "realtime:#{tenant.external_id}:topic") + + {socket, tenant, jwt_secret} + end end diff --git a/test/support/generators.ex b/test/support/generators.ex index 67dec4f44..3c82b7be1 100644 --- a/test/support/generators.ex +++ b/test/support/generators.ex @@ -2,7 +2,9 @@ defmodule Generators do @moduledoc """ Data genarators for tests. """ + alias Realtime.Helpers alias Realtime.Tenants.Connect + @spec tenant_fixture(map()) :: Realtime.Api.Tenant.t() def tenant_fixture(override \\ %{}) do create_attrs = %{ @@ -194,4 +196,31 @@ defmodule Generators do WITH CHECK ( realtime.channel_name() = '#{name}' ); """ end + + def generate_token(secret) when is_binary(secret) do + generate_token(%{}, secret) + end + + def generate_token(claims) do + generate_token(claims, jwt_secret()) + end + + def generate_token(claims, jwt_secret) do + claims = + %{ + ref: "localhost", + iat: System.system_time(:second), + exp: System.system_time(:second) + 604_800 + } + |> Map.merge(claims) + + signer = Joken.Signer.create("HS256", jwt_secret) + {:ok, token} = Joken.Signer.sign(claims, signer) + {token, jwt_secret} + end + + def jwt_secret() do + secure_key = Application.fetch_env!(:realtime, :db_enc_key) + :crypto.strong_rand_bytes(33) |> Base.encode64() |> Helpers.encrypt!(secure_key) + end end