Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ If you're using the default tenant, the URL is `ws://realtime-dev.localhost:4000
| JANITOR_CLEANUP_MAX_CHILDREN | number | Maximum number of concurrent tasks working on janitor cleanup |
| JANITOR_CLEANUP_CHILDREN_TIMEOUT | number | Timeout for each async task for janitor cleanup |
| JANITOR_CHUNK_SIZE | number | Number of tenants to process per chunk. Each chunk will be processed by a Task |
| MIGRATION_PARTITION_SLOTS | number | Number of dynamic supervisor partitions used by the migrations process |
| METRICS_CLEANER_SCHEDULE_TIMER_IN_MS | number | Time in ms to run the Metric Cleaner task |

## WebSocket URL
Expand Down
4 changes: 4 additions & 0 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ port = System.get_env("DB_PORT", "5432")
db_version = System.get_env("DB_IP_VERSION")
slot_name_suffix = System.get_env("SLOT_NAME_SUFFIX")

migration_partition_slots =
System.get_env("MIGRATION_PARTITION_SLOTS", "#{System.schedulers_online() * 2}") |> String.to_integer()

if !(db_version in [nil, "ipv6", "ipv4"]),
do: raise("Invalid IP version, please set either ipv6 or ipv4")

Expand All @@ -31,6 +34,7 @@ socket_options =
end

config :realtime,
migration_partition_slots: migration_partition_slots,
tenant_max_bytes_per_second: System.get_env("TENANT_MAX_BYTES_PER_SECOND", "100000") |> String.to_integer(),
tenant_max_channels_per_client: System.get_env("TENANT_MAX_CHANNELS_PER_CLIENT", "100") |> String.to_integer(),
tenant_max_concurrent_users: System.get_env("TENANT_MAX_CONCURRENT_USERS", "200") |> String.to_integer(),
Expand Down
5 changes: 4 additions & 1 deletion lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ defmodule Realtime.Application do

use Application
require Logger

alias Realtime.Repo.Replica

defmodule JwtSecretError, do: defexception([:message])
defmodule JwtClaimValidatorsError, do: defexception([:message])

Expand Down Expand Up @@ -48,6 +50,7 @@ defmodule Realtime.Application do

region = Application.get_env(:realtime, :region)
:syn.join(RegionNodes, region, self(), node: node())
migration_partition_slots = Application.get_env(:realtime, :migration_partition_slots)

children =
[
Expand All @@ -70,7 +73,7 @@ defmodule Realtime.Application do
child_spec: {DynamicSupervisor, max_restarts: 0},
strategy: :one_for_one,
name: Realtime.Tenants.Migrations.DynamicSupervisor,
partitions: System.schedulers_online() * 2},
partitions: migration_partition_slots},
{PartitionSupervisor,
child_spec: DynamicSupervisor, strategy: :one_for_one, name: Realtime.Tenants.Connect.DynamicSupervisor},
{PartitionSupervisor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandler do
alias RealtimeWeb.Presence
alias RealtimeWeb.RealtimeChannel.Logging

@spec handle(map(), Phoenix.Socket.t()) ::
{:noreply, Phoenix.Socket.t()} | {:reply, :error | :ok, Phoenix.Socket.t()}
@spec handle(map(), Phoenix.Socket.t()) :: {:reply, :error | :ok, Phoenix.Socket.t()}
def handle(%{"event" => event} = payload, socket) do
event = String.downcase(event, :ascii)

case handle_presence_event(event, payload, socket) do
{:ok, socket} ->
{:noreply, socket}
{:reply, :ok, socket}

{:error, socket} ->
{:noreply, socket}
{:reply, :error, socket}
end
end

Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.34.38",
version: "2.34.39",
elixir: "~> 1.17.3",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
14 changes: 12 additions & 2 deletions test/e2e/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ describe("presence extension", () => {
let supabase = await createClient(url, token, { realtime });

let result: any = [];
let error = null;
let topic = "topic:" + crypto.randomUUID();
let message = crypto.randomUUID();
let key = crypto.randomUUID();
Expand All @@ -145,7 +146,10 @@ describe("presence extension", () => {
)
.subscribe(async (status: string) => {
if (status == "SUBSCRIBED") {
await channel.track(expectedPayload);
const res = await channel.track(expectedPayload, { timeout: 1000 });
if (res == "timed out") {
error = res;
}
}
});

Expand All @@ -155,6 +159,7 @@ describe("presence extension", () => {
let presences = result[0].newPresences[0];
assertEquals(result[0].key, key);
assertEquals(presences.message, message);
assertEquals(error, null);
});

it("user is able to receive presence updates on private channels", async () => {
Expand All @@ -163,6 +168,7 @@ describe("presence extension", () => {
await supabase.realtime.setAuth();

let result: any = [];
let error = null;
let topic = "topic:" + crypto.randomUUID();
let message = crypto.randomUUID();
let key = crypto.randomUUID();
Expand All @@ -178,7 +184,10 @@ describe("presence extension", () => {
)
.subscribe(async (status: string) => {
if (status == "SUBSCRIBED") {
await channel.track(expectedPayload);
const res = await channel.track(expectedPayload, { timeout: 1000 });
if (res == "timed out") {
error = res;
}
}
});

Expand All @@ -188,6 +197,7 @@ describe("presence extension", () => {
let presences = result[0].newPresences[0];
assertEquals(result[0].key, key);
assertEquals(presences.message, message);
assertEquals(error, null);
});
});

Expand Down
3 changes: 2 additions & 1 deletion test/integration/rt_channel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,9 @@ defmodule Realtime.Integration.RtChannelTest do

WebsocketClient.send_event(socket, topic, "presence", payload)

assert_receive %Phoenix.Socket.Message{topic: ^topic, event: "phx_reply", payload: %{"status" => "ok"}}
assert_receive %Message{event: "presence_diff", payload: %{"joins" => joins, "leaves" => %{}}, topic: ^topic}
refute_receive %Message{topic: ^topic}

join_payload = joins |> Map.values() |> hd() |> get_in(["metas"]) |> hd()
assert get_in(join_payload, ["name"]) == payload.payload.name
assert get_in(join_payload, ["t"]) == payload.payload.t
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,14 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
socket =
socket_fixture(tenant, topic, db_conn, key, %Policies{presence: %PresencePolicies{read: true, write: true}})

PresenceHandler.handle(%{"event" => "track"}, socket)
assert {:reply, :ok, socket} = PresenceHandler.handle(%{"event" => "track"}, socket)
topic = "realtime:#{topic}"
assert_receive %Broadcast{topic: ^topic, event: "presence_diff", payload: %{joins: joins, leaves: %{}}}
assert Map.has_key?(joins, key)

PresenceHandler.handle(%{"event" => "track", "payload" => %{"content" => random_string()}}, socket)
assert {:reply, :ok, _socket} =
PresenceHandler.handle(%{"event" => "track", "payload" => %{"content" => random_string()}}, socket)

assert_receive %Broadcast{topic: ^topic, event: "presence_diff", payload: %{joins: joins, leaves: %{}}}
assert Map.has_key?(joins, key)
refute_receive :_
Expand All @@ -76,28 +78,24 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
false
)

PresenceHandler.handle(%{"event" => "track"}, socket)
assert {:reply, :ok, _socket} = PresenceHandler.handle(%{"event" => "track"}, socket)
topic = "realtime:#{topic}"
assert_receive %Broadcast{topic: ^topic, event: "presence_diff", payload: %{joins: joins, leaves: %{}}}
assert Map.has_key?(joins, key)
end

test "user can untrack when they want", %{
tenant: tenant,
topic: topic,
db_conn: db_conn
} do
test "user can untrack when they want", %{tenant: tenant, topic: topic, db_conn: db_conn} do
key = random_string()

socket =
socket_fixture(tenant, topic, db_conn, key, %Policies{presence: %PresencePolicies{read: true, write: true}})

PresenceHandler.handle(%{"event" => "track"}, socket)
assert {:reply, :ok, socket} = PresenceHandler.handle(%{"event" => "track"}, socket)
topic = "realtime:#{topic}"
assert_receive %Broadcast{topic: ^topic, event: "presence_diff", payload: %{joins: joins, leaves: %{}}}
assert Map.has_key?(joins, key)

PresenceHandler.handle(%{"event" => "untrack"}, socket)
assert {:reply, :ok, _socket} = PresenceHandler.handle(%{"event" => "untrack"}, socket)
assert_receive %Broadcast{topic: ^topic, event: "presence_diff", payload: %{joins: %{}, leaves: leaves}}
assert Map.has_key?(leaves, key)
end
Expand All @@ -111,8 +109,11 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do

for _ <- 1..100, reduce: socket do
socket ->
{:noreply, socket} =
PresenceHandler.handle(%{"event" => "track", "payload" => %{"metadata" => random_string()}}, socket)
assert {:reply, :ok, socket} =
PresenceHandler.handle(
%{"event" => "track", "payload" => %{"metadata" => random_string()}},
socket
)

assert_receive %Broadcast{topic: ^topic, event: "presence_diff"}
socket
Expand All @@ -133,8 +134,11 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do

for _ <- 1..100, reduce: socket do
socket ->
{:noreply, socket} =
PresenceHandler.handle(%{"event" => "track", "payload" => %{"metadata" => random_string()}}, socket)
assert {:reply, :ok, socket} =
PresenceHandler.handle(
%{"event" => "track", "payload" => %{"metadata" => random_string()}},
socket
)

assert_receive %Broadcast{topic: ^topic, event: "presence_diff"}
socket
Expand All @@ -151,6 +155,7 @@ defmodule RealtimeWeb.RealtimeChannel.PresenceHandlerTest do
start_supervised(CurrentTime.Mock)

tenant = Containers.checkout_tenant(true)

RateCounter.stop(tenant.external_id)
GenCounter.stop(tenant.external_id)
RateCounter.new(tenant.external_id)
Expand Down
Loading