Skip to content

Commit

Permalink
fix: Add endpoint to disconnect all users for a given tenant
Browse files Browse the repository at this point in the history
  • Loading branch information
filipecabaco committed Apr 18, 2024
1 parent ff41087 commit 2e594e6
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 12 deletions.
14 changes: 9 additions & 5 deletions lib/realtime/tenants.ex
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,15 @@ defmodule Realtime.Tenants do
|> tap(fn _ -> broadcast_operation_event(:unsuspend_tenant, external_id) end)
end

@doc """
Disconnects all clients from a tenant
"""
@spec disconnect_clients(String.t()) :: :ok
def disconnect_clients(external_id) do
broadcast_operation_event(:disconnect, external_id)
end

defp broadcast_operation_event(action, external_id) do
Phoenix.PubSub.broadcast!(
Realtime.PubSub,
"realtime:operations:invalidate_cache",
{action, external_id}
)
Phoenix.PubSub.broadcast!(Realtime.PubSub, "realtime:operations", {action, external_id})
end
end
2 changes: 1 addition & 1 deletion lib/realtime/tenants/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ defmodule Realtime.Tenants.Cache do
def distributed_invalidate_tenant_cache(tenant_id) when is_binary(tenant_id) do
Phoenix.PubSub.broadcast!(
Realtime.PubSub,
"realtime:operations:invalidate_cache",
"realtime:operations",
{:invalidate_cache, tenant_id}
)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/realtime/tenants/cache_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule Realtime.Tenants.CacheSupervisor do
@impl true
def init(_init_arg) do
children = [
{CachePubSubHandler, topics: ["realtime:operations:invalidate_cache"]},
{CachePubSubHandler, topics: ["realtime:operations"]},
Cache
]

Expand Down
20 changes: 18 additions & 2 deletions lib/realtime/tenants/connect.ex
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ defmodule Realtime.Tenants.Connect do
connected_users_bucket: connected_users_bucket
} = state
) do
:ok = Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations:invalidate_cache")
:ok = Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations")
send_connected_user_check_message(connected_users_bucket, check_connected_user_interval)

{:noreply, state}
Expand Down Expand Up @@ -146,7 +146,23 @@ defmodule Realtime.Tenants.Connect do
{:stop, :normal, state}
end

def handle_info({:suspend_tenant, _}, %{db_conn_pid: db_conn_pid} = state) do
def handle_info(
{:disconnect, tenant_id},
%{db_conn_pid: db_conn_pid, tenant_id: tenant_id} = state
) do
Logger.info("Tenant has requested to disconnect, database connection will be terminated")
:ok = GenServer.stop(db_conn_pid, :normal, 1000)
{:stop, :normal, state}
end

def handle_info({:disconnect, _}, state) do
{:noreply, state}
end

def handle_info(
{:suspend_tenant, tenant_id},
%{db_conn_pid: db_conn_pid, tenant_id: tenant_id} = state
) do
Logger.warning("Tenant was suspended, database connection will be terminated")
:ok = GenServer.stop(db_conn_pid, :normal, 1000)
{:stop, :normal, state}
Expand Down
17 changes: 17 additions & 0 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ defmodule RealtimeWeb.RealtimeChannel do
db_conn: db_conn
}

:ok = Phoenix.PubSub.subscribe(Realtime.PubSub, "realtime:operations")

{:ok, state, assign(socket, assigns)}
else
{:error, [message: "Invalid token", claim: _claim, claim_val: _value]} = error ->
Expand Down Expand Up @@ -247,6 +249,21 @@ defmodule RealtimeWeb.RealtimeChannel do
end
end

def handle_info({:suspend_tenant, tenant_id}, %{tenant_id: tenant_id} = state) do
Logger.info("Tenant suspend, disconencting the socket")
{:stop, :normal, state}
end

def handle_info({:disconnect, tenant_id}, %{tenant_id: tenant_id} = state) do
Logger.info("Socket disconnection requested by the user")
{:stop, :normal, state}
end

# Ignore operations
def handle_info({_, _}, state) do
{:noreply, state}
end

def handle_info(msg, socket) do
Logger.error("HANDLE_INFO message not handled: " <> inspect(msg))
{:noreply, socket}
Expand Down
25 changes: 25 additions & 0 deletions lib/realtime_web/controllers/tenant_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,31 @@ defmodule RealtimeWeb.TenantController do
end
end

operation(:disconnect_clients,
summary: "Disconnect all clients",
parameters: [
token: [
in: :header,
name: "Authorization",
schema: %OpenApiSpex.Schema{type: :string},
required: true,
example:
"Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2ODAxNjIxNTR9.U9orU6YYqXAtpF8uAiw6MS553tm4XxRzxOhz2IwDhpY"
],
tenant_id: [in: :path, description: "Tenant ID", type: :string]
],
responses: %{
200 => TenantHealthResponse.response(),
403 => EmptyResponse.response()
}
)

def disconnect_clients(%{assigns: %{tenant: tenant}} = conn, %{"tenant_id" => tenant_id}) do
Tenants.disconnect_clients(tenant_id)
PostgresCdc.stop_all(tenant, @stop_timeout)
send_resp(conn, 204, "")
end

operation(:health,
summary: "Tenant health",
parameters: [
Expand Down
2 changes: 2 additions & 0 deletions lib/realtime_web/plugs/assign_tenant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ defmodule RealtimeWeb.Plugs.AssignTenant do
|> tap(&GenCounter.add(Tenants.requests_per_second_key(&1)))
|> Api.preload_counters()

Logger.metadata(external_id: external_id, project: external_id)

assign(conn, :tenant, tenant)
else
{:error, :tenant_not_found_in_host} ->
Expand Down
2 changes: 1 addition & 1 deletion test/realtime/tenants/cache_supervisor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ defmodule Realtime.Tenants.CacheSupervisorTest do
# PubSub message
Phoenix.PubSub.broadcast(
Realtime.PubSub,
"realtime:operations:invalidate_cache",
"realtime:operations",
{:suspend_tenant, external_id}
)

Expand Down
4 changes: 2 additions & 2 deletions test/realtime/tenants_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ defmodule Realtime.TenantsTest do
describe "suspend_tenant_by_external_id/1" do
setup do
tenant = tenant_fixture()
topic = "realtime:operations:invalidate_cache"
topic = "realtime:operations"
Phoenix.PubSub.subscribe(Realtime.PubSub, topic)
%{topic: topic, tenant: tenant}
end
Expand All @@ -58,7 +58,7 @@ defmodule Realtime.TenantsTest do
describe "unsuspend_tenant_by_external_id/1" do
setup do
tenant = tenant_fixture()
topic = "realtime:operations:invalidate_cache"
topic = "realtime:operations"
Phoenix.PubSub.subscribe(Realtime.PubSub, topic)
%{topic: topic, tenant: tenant}
end
Expand Down

0 comments on commit 2e594e6

Please sign in to comment.