Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions lib/realtime/monitoring/prom_ex/plugins/tenant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,13 @@ defmodule Realtime.PromEx.Plugins.Tenant do
cluster_count = UsersCounter.tenant_users(t)
tenant = Tenants.Cache.get_tenant_by_external_id(t)

Telemetry.execute(
[:realtime, :connections],
%{connected: count, connected_cluster: cluster_count, limit: tenant.max_concurrent_users},
%{tenant: t}
)
if tenant != nil do
Telemetry.execute(
[:realtime, :connections],
%{connected: count, connected_cluster: cluster_count, limit: tenant.max_concurrent_users},
%{tenant: t}
)
end
end
end

Expand Down
29 changes: 29 additions & 0 deletions lib/realtime_web/controllers/tenant_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,35 @@ defmodule RealtimeWeb.TenantController do
end
end

operation(:create,
summary: "Create or update tenant",
parameters: [
token: [
in: :header,
name: "Authorization",
schema: %OpenApiSpex.Schema{type: :string},
required: true,
example:
"Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJpYXQiOjE2ODAxNjIxNTR9.U9orU6YYqXAtpF8uAiw6MS553tm4XxRzxOhz2IwDhpY"
]
],
request_body: TenantParams.params(),
responses: %{
200 => TenantResponse.response(),
403 => EmptyResponse.response()
}
)

@spec create(any(), map()) :: any()
def create(conn, %{"tenant" => params}) do
external_id = Map.get(params, "external_id")

case Tenant.changeset(%Tenant{}, params) do
%{valid?: true} -> update(conn, %{"tenant_id" => external_id, "tenant" => params})
changeset -> changeset
end
end

operation(:update,
summary: "Create or update tenant",
parameters: [
Expand Down
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.34.49",
version: "2.34.50",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down Expand Up @@ -98,6 +98,7 @@ defmodule Realtime.MixProject do
"ecto.setup": ["ecto.create", "ecto.migrate", "run priv/repo/seeds.exs"],
"ecto.reset": ["ecto.drop", "ecto.setup"],
test: [
"cmd epmd -daemon",
"ecto.create --quiet",
"run priv/repo/seeds_before_migration.exs",
"ecto.migrate --migrations-path=priv/repo/migrations",
Expand Down
39 changes: 21 additions & 18 deletions test/integration/rt_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ defmodule Realtime.Integration.RtChannelTest do
alias Postgrex
alias Realtime.Database
alias Realtime.GenCounter
alias Realtime.Integration.RtChannelTest.Endpoint
alias Realtime.Integration.WebsocketClient
alias Realtime.RateCounter
alias Realtime.Tenants
Expand All @@ -29,18 +28,7 @@ defmodule Realtime.Integration.RtChannelTest do
@uri "ws://#{@external_id}.localhost:#{@port}/socket/websocket"
@secret "secure_jwt_secret"

Application.put_env(:phoenix, Endpoint,
https: false,
http: [port: @port],
debug_errors: false,
server: true,
pubsub_server: __MODULE__,
secret_key_base: String.duplicate("a", 64)
)

Application.delete_env(:joken, :current_time_adapter)

defmodule Endpoint do
defmodule TestEndpoint do
use Phoenix.Endpoint, otp_app: :phoenix

@session_config store: :cookie,
Expand Down Expand Up @@ -68,12 +56,24 @@ defmodule Realtime.Integration.RtChannelTest do
end
end

Application.put_env(:phoenix, TestEndpoint,
https: false,
http: [port: @port],
debug_errors: false,
server: true,
pubsub_server: __MODULE__,
secret_key_base: String.duplicate("a", 64)
)

Application.delete_env(:joken, :current_time_adapter)

defmodule Token do
use Joken.Config
end

setup do
RateCounter.stop(@external_id)
GenCounter.stop(@external_id)
Cache.invalidate_tenant_cache(@external_id)
Process.sleep(500)

Expand All @@ -83,7 +83,7 @@ defmodule Realtime.Integration.RtChannelTest do
end

setup_all do
capture_log(fn -> start_supervised!(Endpoint) end)
capture_log(fn -> start_supervised!(TestEndpoint) end)
start_supervised!({Phoenix.PubSub, name: __MODULE__})
:ok
end
Expand Down Expand Up @@ -300,21 +300,24 @@ defmodule Realtime.Integration.RtChannelTest do
end

@tag policies: [:authenticated_read_broadcast_and_presence]
test "private broadcast with valid channel no write permissions won't send message but will receive message",
%{topic: topic} do
test "private broadcast with valid channel no write permissions won't send message but will receive message", %{
topic: topic
} do
config = %{broadcast: %{self: true}, private: true}
topic = "realtime:#{topic}"

{service_role_socket, _} = get_connection("service_role")

WebsocketClient.join(service_role_socket, topic, %{config: config})
assert_receive %Message{event: "phx_reply", topic: ^topic}, 500
assert_receive %Message{event: "presence_state"}, 500
assert_receive %Message{event: "presence_state"}, 1000

{socket, _} = get_connection("authenticated")
WebsocketClient.join(socket, topic, %{config: config})
assert_receive %Message{event: "phx_reply", topic: ^topic}, 500
assert_receive %Message{event: "presence_state"}, 500
assert_receive %Message{event: "presence_state"}, 1000

Process.sleep(1000)

payload = %{"event" => "TEST", "payload" => %{"msg" => 1}, "type" => "broadcast"}
WebsocketClient.send_event(socket, topic, "broadcast", payload)
Expand Down
1 change: 1 addition & 0 deletions test/realtime/database_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ defmodule Realtime.DatabaseTest do
end)
end

@tag db_pool: 1
test "on checkout error, handles raised exception as an error", %{db_conn: db_conn} do
for _ <- 1..5 do
Task.start(fn ->
Expand Down
52 changes: 52 additions & 0 deletions test/realtime/monitoring/prom_ex/plugins/tenant_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
defmodule Realtime.PromEx.Plugins.TenantTest do
use Realtime.DataCase

alias Realtime.PromEx.Plugins.Tenant
alias Realtime.Rpc
alias Realtime.UsersCounter

def handle_telemetry(event, metadata, content, pid: pid), do: send(pid, {event, metadata, content})

@aux_mod (quote do
defmodule FakeUserCounter do
def fake_add(external_id) do
:ok = UsersCounter.add(spawn(fn -> Process.sleep(2000) end), external_id)
end
end
end)

Code.eval_quoted(@aux_mod)

describe "execute_tenant_metrics/0" do
setup do
tenant = Containers.checkout_tenant()
:telemetry.attach(__MODULE__, [:realtime, :connections], &__MODULE__.handle_telemetry/4, pid: self())

on_exit(fn ->
:telemetry.detach(__MODULE__)
Containers.checkin_tenant(tenant)
end)

{:ok, node} = Clustered.start(@aux_mod)
%{tenant: tenant, node: node}
end

test "returns a list of tenant metrics and handles bad tenant ids", %{
tenant: %{external_id: external_id},
node: node
} do
UsersCounter.add(self(), external_id)
# Add bad tenant id
UsersCounter.add(self(), random_string())

_ = Rpc.call(node, FakeUserCounter, :fake_add, [external_id])
Process.sleep(500)
Tenant.execute_tenant_metrics()

assert_receive {[:realtime, :connections], %{connected: 1, limit: 200, connected_cluster: 2},
%{tenant: ^external_id}}

refute_receive :_
end
end
end
Loading
Loading