Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco committed Apr 18, 2024
1 parent 2e594e6 commit db43ea7
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 38 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ PORT ?= 4000
.PHONY: dev dev.orange seed prod bench.% dev_db start start.% stop stop.% rebuild rebuild.%

dev:
ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=$(PORT) MIX_ENV=dev SECURE_CHANNELS=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev REGION=fra FLY_ALLOC_ID=123e4567-e89b-12d3-a456-426614174000 DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" iex --name $(NODE_NAME)@127.0.0.1 --cookie cookie -S mix phx.server
ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=$(PORT) MIX_ENV=dev SECURE_Case
S=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev REGION=fra FLY_ALLOC_ID=123e4567-e89b-12d3-a456-426614174000 DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" iex --name $(NODE_NAME)@127.0.0.1 --cookie cookie -S mix phx.server

dev.orange:
ELIXIR_ERL_OPTIONS="+hmax 1000000000" SLOT_NAME_SUFFIX=some_sha PORT=4001 MIX_ENV=dev SECURE_CHANNELS=true API_JWT_SECRET=dev METRICS_JWT_SECRET=dev FLY_REGION=fra FLY_ALLOC_ID=123e4567-e89b-12d3-a456-426614174000 DB_ENC_KEY="1234567890123456" CLUSTER_STRATEGIES=$(CLUSTER_STRATEGIES) ERL_AFLAGS="-kernel shell_history enabled" iex --name orange@127.0.0.1 --cookie cookie -S mix phx.server
Expand Down
5 changes: 4 additions & 1 deletion lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,10 @@ defmodule Realtime.Tenants do
"""
@spec disconnect_clients(String.t()) :: :ok
def disconnect_clients(external_id) do
broadcast_operation_event(:disconnect, external_id)
suspend_tenant_by_external_id(external_id)
:timer.sleep(1000)
unsuspend_tenant_by_external_id(external_id)
:ok
end

defp broadcast_operation_event(action, external_id) do
Expand Down
5 changes: 5 additions & 0 deletions lib/realtime/tenants/cache_pub_sub_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,9 @@ defmodule Realtime.Tenants.CachePubSubHandler do
Cache.invalidate_tenant_cache(tenant_id)
{:noreply, state}
end

# Ignore operations
def handle_info({_, _}, state) do
{:noreply, state}
end
end
16 changes: 4 additions & 12 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -147,27 +147,19 @@ defmodule Realtime.Tenants.Connect do
end

def handle_info(
{:disconnect, tenant_id},
{:suspend_tenant, tenant_id},
%{db_conn_pid: db_conn_pid, tenant_id: tenant_id} = state
) do
Logger.info("Tenant has requested to disconnect, database connection will be terminated")
Logger.warning("Tenant was suspended, database connection will be terminated")
:ok = GenServer.stop(db_conn_pid, :normal, 1000)
{:stop, :normal, state}
end

def handle_info({:disconnect, _}, state) do
# Ignore suspend_tenant if they are not related with this tenant
def handle_info({:suspend_tenant, _}, state) do
{:noreply, state}
end

def handle_info(
{:suspend_tenant, tenant_id},
%{db_conn_pid: db_conn_pid, tenant_id: tenant_id} = state
) do
Logger.warning("Tenant was suspended, database connection will be terminated")
:ok = GenServer.stop(db_conn_pid, :normal, 1000)
{:stop, :normal, state}
end

# Ignore unsuspend messages to avoid handle_info unmatched functions
def handle_info({:unsuspend_tenant, _}, state) do
{:noreply, state}
Expand Down
7 changes: 1 addition & 6 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -249,16 +249,11 @@ defmodule RealtimeWeb.RealtimeChannel do
end
end

def handle_info({:suspend_tenant, tenant_id}, %{tenant_id: tenant_id} = state) do
def handle_info({:suspend_tenant, tenant_id}, %{assigns: %{tenant: tenant_id}} = state) do
Logger.info("Tenant suspend, disconencting the socket")
{:stop, :normal, state}
end

def handle_info({:disconnect, tenant_id}, %{tenant_id: tenant_id} = state) do
Logger.info("Socket disconnection requested by the user")
{:stop, :normal, state}
end

# Ignore operations
def handle_info({_, _}, state) do
{:noreply, state}
Expand Down
7 changes: 4 additions & 3 deletions lib/realtime_web/controllers/tenant_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,13 @@ defmodule RealtimeWeb.TenantController do
}
)

def disconnect_clients(%{assigns: %{tenant: tenant}} = conn, %{"tenant_id" => tenant_id}) do
Tenants.disconnect_clients(tenant_id)
PostgresCdc.stop_all(tenant, @stop_timeout)
def disconnect_clients(%{assigns: %{tenant: tenant, role: "service_role"}} = conn, _) do
:ok = Realtime.Tenants.disconnect_clients(tenant.external_id)
send_resp(conn, 204, "")
end

def disconnect_clients(_, _), do: {:error, :unauthorized}

operation(:health,
summary: "Tenant health",
parameters: [
Expand Down
1 change: 1 addition & 0 deletions lib/realtime_web/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ defmodule RealtimeWeb.Router do
pipe_through([:open_cors, :tenant_api, :secure_tenant_api])

post("/broadcast", BroadcastController, :broadcast)
delete("/disconnect_clients", TenantController, :disconnect_clients)
end

scope "/api", RealtimeWeb do
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.28.28",
version: "2.28.29",
elixir: "~> 1.14.0",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
20 changes: 7 additions & 13 deletions test/integration/rt_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -623,20 +623,14 @@ defmodule Realtime.Integration.RtChannelTest do
end
end

defp token_valid(role), do: generate_token(%{role: role})
defp token_no_role(), do: generate_token()

defp generate_token(claims \\ %{}) do
claims =
%{
ref: "localhost",
iat: System.system_time(:second),
exp: System.system_time(:second) + 604_800
}
|> Map.merge(claims)
defp token_valid(role) do
{token, _} = generate_token(%{role: role}, @secret)
{:ok, token}
end

signer = Joken.Signer.create("HS256", @secret)
Joken.Signer.sign(claims, signer)
defp token_no_role() do
{token, _} = generate_token(@secret)
{:ok, token}
end

defp get_connection(role \\ "anon") do
Expand Down
100 changes: 99 additions & 1 deletion test/realtime_web/controllers/tenant_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule RealtimeWeb.TenantControllerTest do
use RealtimeWeb.ConnCase, async: false

import Mock
import Phoenix.ChannelTest
import Realtime.Helpers, only: [encrypt!: 2, transaction: 2]

alias Realtime.Api.Tenant
Expand All @@ -15,6 +16,7 @@ defmodule RealtimeWeb.TenantControllerTest do

alias RealtimeWeb.ChannelsAuthorization
alias RealtimeWeb.JwtVerification
alias RealtimeWeb.Joken.CurrentTime

@update_attrs %{
jwt_secret: "some updated jwt_secret",
Expand Down Expand Up @@ -135,7 +137,7 @@ defmodule RealtimeWeb.TenantControllerTest do
setup %{tenant: tenant} do
[extension] = tenant.extensions
args = Map.put(extension.settings, "id", tenant.external_id)
{:ok, _} = Realtime.PostgresCdc.connect(Extensions.PostgresCdcStream, args)
Realtime.PostgresCdc.connect(Extensions.PostgresCdcStream, args)
on_exit(fn -> Realtime.PostgresCdc.stop_all(tenant) end)
:ok
end
Expand Down Expand Up @@ -287,7 +289,103 @@ defmodule RealtimeWeb.TenantControllerTest do
end
end

describe "disconnect clients" do
setup do
start_supervised!(CurrentTime.Mock)

Application.put_env(:realtime, :db_enc_key, "1234567890123456")
{socket, tenant, jwt_secret} = create_socket()
{socket_2, _tenant_2, _jwt_secret_2} = create_socket()
%{socket: socket, socket_2: socket_2, tenant: tenant, jwt_secret: jwt_secret}
end

test "disconnects clients when disconnect clients is called and then it's able to reconnect",
%{
conn: conn,
socket: socket,
socket_2: socket_2,
tenant: tenant,
jwt_secret: jwt_secret
} do
{token, _} = generate_token(%{role: "service_role"}, jwt_secret)

conn =
conn
|> put_req_header("accept", "application/json")
|> put_req_header("apikey", token)
|> delete_req_header("authorization")
|> then(&%{&1 | host: "#{tenant.external_id}.supabase.com"})
|> delete(~p"/api/disconnect_clients")

assert conn.status == 204
:timer.sleep(1000)
refute socket.channel_pid |> Process.alive?()
assert socket_2.channel_pid |> Process.alive?()
:timer.sleep(1000)

{socket, _tenant, _jwt_secret} = create_socket(tenant, jwt_secret)
assert socket.channel_pid |> Process.alive?()
end

test "no action if not service_role token", %{
conn: conn,
socket: socket,
tenant: tenant,
jwt_secret: jwt_secret
} do
{token, _} = generate_token(%{role: "authenticated"}, jwt_secret)

conn =
conn
|> put_req_header("accept", "application/json")
|> put_req_header("apikey", token)
|> delete_req_header("authorization")
|> then(&%{&1 | host: "#{tenant.external_id}.supabase.com"})
|> delete(~p"/api/disconnect_clients")

assert conn.status == 401
assert socket.channel_pid |> Process.alive?()
end
end

defp create_tenant(_) do
%{tenant: tenant_fixture()}
end

defp create_socket() do
{_, jwt_secret} = generate_token(%{role: "anon"})
tenant = tenant_fixture(jwt_secret: jwt_secret)

create_socket(tenant, jwt_secret)
end

defp create_socket(tenant, jwt_secret) do
secure_key = Application.fetch_env!(:realtime, :db_enc_key)

{token, _} = generate_token(%{role: "anon"}, jwt_secret)

assigns = %{
tenant: tenant.external_id,
log_level: :info,
postgres_cdc_module: Extensions.PostgresCdcRls,
postgres_extension: tenant.extensions |> hd() |> Map.from_struct() |> Map.get(:settings),
tenant_token: token,
access_token: token,
jwt_secret: Helpers.encrypt!(jwt_secret, secure_key),
limits: %{
max_concurrent_users: 500,
max_events_per_second: 500,
max_bytes_per_second: 1_000_000,
max_channels_per_client: 100,
max_joins_per_second: 500
}
}

{:ok, _, socket} =
RealtimeWeb.UserSocket
|> socket("user_id", assigns)
|> subscribe_and_join(RealtimeWeb.RealtimeChannel, "realtime:#{tenant.external_id}:topic")

{socket, tenant, jwt_secret}
end
end
29 changes: 29 additions & 0 deletions test/support/generators.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ defmodule Generators do
@moduledoc """
Data genarators for tests.
"""
alias Realtime.Helpers
alias Realtime.Tenants.Connect

@spec tenant_fixture(map()) :: Realtime.Api.Tenant.t()
def tenant_fixture(override \\ %{}) do
create_attrs = %{
Expand Down Expand Up @@ -194,4 +196,31 @@ defmodule Generators do
WITH CHECK ( realtime.channel_name() = '#{name}' );
"""
end

def generate_token(secret) when is_binary(secret) do
generate_token(%{}, secret)
end

def generate_token(claims) do
generate_token(claims, jwt_secret())
end

def generate_token(claims, jwt_secret) do
claims =
%{
ref: "localhost",
iat: System.system_time(:second),
exp: System.system_time(:second) + 604_800
}
|> Map.merge(claims)

signer = Joken.Signer.create("HS256", jwt_secret)
{:ok, token} = Joken.Signer.sign(claims, signer)
{token, jwt_secret}
end

def jwt_secret() do
secure_key = Application.fetch_env!(:realtime, :db_enc_key)
:crypto.strong_rand_bytes(33) |> Base.encode64() |> Helpers.encrypt!(secure_key)
end
end

0 comments on commit db43ea7

Please sign in to comment.