Skip to content

Commit

Permalink
Release (#463)
Browse files Browse the repository at this point in the history
* fix: don't update syn meta if the node is mismatched (#433)

* update multiplayer demo MAX_ROOM_USERS to 50

* fix: drop LHR and GRU (#436)

* feat: footer improvements (#429)

* feat: display the count of connected tenants in metrics (#418)

* fix: don't try to stop the not alive pid in the resolver (#417)

* fix: increase read replica pool size

* feat: default realtime-dev JWT secret to API_JWT_SECRET env var value

* chore(deps): bump json5 from 1.0.1 to 1.0.2 in /demo

Bumps [json5](https://github.com/json5/json5) from 1.0.1 to 1.0.2.
- [Release notes](https://github.com/json5/json5/releases)
- [Changelog](https://github.com/json5/json5/blob/main/CHANGELOG.md)
- [Commits](json5/json5@v1.0.1...v1.0.2)

---
updated-dependencies:
- dependency-name: json5
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>

* feat: default Repo with_dynamic_repo name to nil and pool_size to 1 (#423)

* fix: looks for the PR number after # symbol (#425)

* Add defaults to current fields to reflect db schema

* Add max_bytes_per_second and max_channels_per_client fields

* Add max_joins_per_second to schema

* Add db limits to socket

* Use db limits in channel

* fix: tests

* fix: do not block a registration process (#426)

* fix: add http checks

* feat: stop connection to a tenant's db if there are no connected users for 10 minutes. (#428)

* feat: add short_node_id helper

* Use new helper in Realtime.PromEx

* feat: add version to footer and use short node id

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: abc3 <sts@abc3.dev>
Co-authored-by: Wen Bo Xie <wenbo.xie3@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Wen Bo Xie <5532241+w3b6x9@users.noreply.github.com>

* fix: switch to aws roles

* chore: update staging branch to main and prod branch to release

* Add sjc replica (#441)

* fix: load sjc replica from DB_HOST_REPLICA_SJC env var (#443)

* Version is compiled now (#444)

* Bump version

* fix: replace replication poller process sleep with send_after (#447)

* feat: show latency on db events on inspector (#450)

* Show latency on db events on inspector

* Round to seconds for now

* fix: prepend inspector latency with tilde (#451)

* fix: update SLOT_NAME_SUFFIX to capture first 7 commit SHA chars

* docs: add WebSocket URL for local and prod in README

* fix: make rlimit configurable by env var (#460)

* fix: make rlimit configurable by env var

* chore: fix unbound variable

* feat: add in op for filter and enable filter on delete records (#461)

* feat: add in op when filtering for values

* feat: enable filtering on delete records

* High res commit timestamps

* High res latency on Postgres payloads

* toFixed(1)

* Footer precision to match multiplayer.dev

* Init footer latency to 0.0

* Use ISO8601 with millis

* fix: update walrus db change ms timestamp migration

* fix: grammar/spelling changes (#465)

* Grammar/spelling changes

* "realtime" not "real-time"

* Revert

* Support `in` filter when parsing subscription params

* Use PostgREST syntax

* Fix test

* feat: wrap 'in' filter value with curly brackets

* `bad` is not a real filter, why use it as an example?

* Sort region nodes by node name

* Adds a doc

* Consolidate launch_node fn and add logging

* feat: use regex for 'in' filter parentheses value

* Map "sea" to SJC database

* Change regions map `fra` to `lhr` and `sjc` to `sea`

* Use node short id and put region on status page

* Handle nohost for tests

* Include region in latency payloads

* Remove extra Logger statement

* Short node name only needed with an ipv6 address

* Usage Logger server init

* Telem usage logger

* Working

* Make counting more obvious in realtime channel

* Metric for Channel events

* Optionally execute telemetry from a `RateCounter`

* Cleanup

* Cleanup

* List tenants for a node only

* Start Realtime.Tenants module

* Change :all limiter key to :plug because that's what it is

* Fix test

* Changes limiter keys to be more obvious

* Fix tests

* Unused variables

* feat: admin tenants index (#442)

* feat: list and sort tenants
* Reactive filter and sorting is working
* Show important limits on tenants admin list

* fix: log badrpc reason

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: abc3 <sts@abc3.dev>
Co-authored-by: Wen Bo Xie <wenbo.xie3@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Wen Bo Xie <5532241+w3b6x9@users.noreply.github.com>
Co-authored-by: Inian <inian1234@gmail.com>
Co-authored-by: Han Qiao <qiao@supabase.io>
  • Loading branch information
7 people authored Feb 6, 2023
1 parent f6bb0c2 commit 36c8a4b
Show file tree
Hide file tree
Showing 41 changed files with 1,550 additions and 193 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/prod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- uses: actions/checkout@v2
- uses: superfly/flyctl-actions@1.1
with:
args: "-c deploy/fly/prod.toml deploy --build-arg SLOT_NAME_SUFFIX=${GITHUB_SHA::6}"
args: "-c deploy/fly/prod.toml deploy --build-arg SLOT_NAME_SUFFIX=${GITHUB_SHA::7}"
env:
FLY_API_TOKEN: ${{ secrets.FLY_API_TOKEN }}

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/staging.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ jobs:
- uses: actions/checkout@v2
- uses: superfly/flyctl-actions@1.1
with:
args: "-c deploy/fly/staging.toml deploy --build-arg SLOT_NAME_SUFFIX=${GITHUB_SHA::6}"
args: "-c deploy/fly/staging.toml deploy --build-arg SLOT_NAME_SUFFIX=${GITHUB_SHA::7}"
2 changes: 1 addition & 1 deletion .github/workflows/staging_supabench.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
args: "-c deploy/fly/qa.toml scale count 1"
- uses: superfly/flyctl-actions@1.1
with:
args: "-c deploy/fly/qa.toml deploy --build-arg SLOT_NAME_SUFFIX=${GITHUB_SHA::6}"
args: "-c deploy/fly/qa.toml deploy --build-arg SLOT_NAME_SUFFIX=${GITHUB_SHA::7}"
- uses: superfly/flyctl-actions@1.1
if: always()
with:
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ COPY --from=tailscale /app/tailscaled /tailscale/tailscaled
COPY --from=tailscale /app/tailscale /tailscale/tailscale
RUN mkdir -p /var/run/tailscale /var/cache/tailscale /var/lib/tailscale

ENV RLIMIT_NOFILE 100000
COPY limits.sh /app/limits.sh
ENTRYPOINT ["/usr/bin/tini", "-s", "-g", "--", "/app/limits.sh"]

Expand Down
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,15 @@ DB_POOL_SIZE # {string} Sets the number of connections in the
SLOT_NAME_SUFFIX # {string} This is appended to the replication slot which allows making a custom slot name. May contain lowercase letters, numbers, and the underscore character. Together with the default `supabase_realtime_replication_slot`, slot name should be up to 64 characters long.
```

## Websocket Connection Authorization
## WebSocket URL

Websocket connections are authorized via symmetric JWT verification. Only supports JWTs signed with the following algorithms:
The WebSocket URL is in the following format for local development: `ws://[external_id].localhost:4000/socket/websocket`

If you're using Supabase's hosted Realtime in production the URL is `wss://[project-ref].supabase.co/realtime/v1/websocket?apikey=[anon-token]&log_level=info&vsn=1.0.0"`

## WebSocket Connection Authorization

WebSocket connections are authorized via symmetric JWT verification. Only supports JWTs signed with the following algorithms:

- HS256
- HS384
Expand All @@ -165,7 +171,7 @@ Verify JWT claims by setting JWT_CLAIM_VALIDATORS:
>
> Then JWT's "iss" value must equal "Issuer" and "nbf" value must equal 1610078130.
> **Note:**
**Note:**
> JWT expiration is checked automatically. `exp` and `role` (database role) keys are mandatory.
**Authorizing Client Connection**: You can pass in the JWT by following the instructions under the Realtime client lib. For example, refer to the **Usage** section in the [@supabase/realtime-js](https://github.com/supabase/realtime-js) client library.
Expand Down
9 changes: 5 additions & 4 deletions assets/js/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,17 @@ Hooks.payload = {

// Listen for all (`*`) `postgres_changes` events on tables in the `public` schema
this.channel.on("postgres_changes", { event: "*", schema: schema, table: table }, payload => {
let ts = new Date();
let ts = performance.now() + performance.timeOrigin
let iso_ts = new Date()
let payload_ts = Date.parse(payload.commit_timestamp)
let latency = (ts.getTime() / 1000) - (payload_ts / 1000)
let latency = ts - payload_ts
let line =
`<tr class="bg-white border-b hover:bg-gray-50">
<td class="py-4 px-6">POSTGRES</td>
<td class="py-4 px-6">${ts.toISOString()}</td>
<td class="py-4 px-6">${iso_ts.toISOString()}</td>
<td class="py-4 px-6">
<div class="pb-3">${JSON.stringify(payload)}</div>
<div class="pt-3 border-t hover:bg-gray-50">Latency: ${Math.round(latency)} sec</div>
<div class="pt-3 border-t hover:bg-gray-50">Latency: ${latency.toFixed(1)} ms</div>
</td>
</tr>`
let list = document.querySelector("#plist")
Expand Down
16 changes: 1 addition & 15 deletions lib/extensions/postgres_cdc_rls/cdc_rls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ defmodule Extensions.PostgresCdcRls do

def start_distributed(%{"region" => region, "id" => tenant} = args) do
fly_region = PostgresCdc.aws_to_fly(region)
launch_node = launch_node(tenant, fly_region, node())
launch_node = PostgresCdc.launch_node(tenant, fly_region, node())

Logger.warning(
"Starting distributed postgres extension #{inspect(lauch_node: launch_node, region: region, fly_region: fly_region)}"
Expand Down Expand Up @@ -136,20 +136,6 @@ defmodule Extensions.PostgresCdcRls do
end
end

def launch_node(tenant, fly_region, default) do
case PostgresCdc.region_nodes(fly_region) do
[_ | _] = regions_nodes ->
member_count = Enum.count(regions_nodes)
index = :erlang.phash2(tenant, member_count)
{_, [node: launch_node]} = Enum.at(regions_nodes, index)
launch_node

_ ->
Logger.warning("Didn't find launch_node, return default #{inspect(default)}")
default
end
end

def create_subscription(conn, publication, opts, timeout \\ 5_000) do
conn_node = node(conn)

Expand Down
41 changes: 34 additions & 7 deletions lib/extensions/postgres_cdc_rls/subscriptions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do

@type conn() :: Postgrex.conn()

@filter_types ["eq", "neq", "lt", "lte", "gt", "gte"]
@filter_types ["eq", "neq", "lt", "lte", "gt", "gte", "in"]

@spec create(conn(), String.t(), list(map())) ::
{:ok, Postgrex.Result.t()}
Expand Down Expand Up @@ -140,19 +140,25 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
@doc """
Parses subscription filter parameters into something we can pass into our `create_subscription` query.
We currently support the following filters: 'eq', 'neq', 'lt', 'lte', 'gt', 'gte`
We currently support the following filters: 'eq', 'neq', 'lt', 'lte', 'gt', 'gte', 'in'
## Examples
iex> params = %{"schema" => "public", "table" => "messages", "filter" => "subject=eq.hey"}
iex> Extensions.PostgresCdcRls.Subscriptions.parse_subscription_params(params)
{:ok, ["public", "messages", [{"subject", "eq", "hey"}]]}
`in` filter:
iex> params = %{"schema" => "public", "table" => "messages", "filter" => "subject=in.(hidee,ho)"}
iex> Extensions.PostgresCdcRls.Subscriptions.parse_subscription_params(params)
{:ok, ["public", "messages", [{"subject", "in", "{hidee,ho}"}]]}
An unsupported filter will respond with an error tuple:
iex> params = %{"schema" => "public", "table" => "messages", "filter" => "subject=in.hey"}
iex> params = %{"schema" => "public", "table" => "messages", "filter" => "subject=like.hey"}
iex> Extensions.PostgresCdcRls.Subscriptions.parse_subscription_params(params)
{:error, ~s(Error parsing `filter` params: ["in", "hey"])}
{:error, ~s(Error parsing `filter` params: ["like", "hey"])}
Catch `undefined` filters:
Expand All @@ -168,10 +174,15 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
%{"schema" => schema, "table" => table, "filter" => filter} ->
with [col, rest] <- String.split(filter, "=", parts: 2),
[filter_type, value] when filter_type in @filter_types <-
String.split(rest, ".", parts: 2) do
{:ok, [schema, table, [{col, filter_type, value}]]}
String.split(rest, ".", parts: 2),
{:ok, formatted_value} <- format_filter_value(filter_type, value) do
{:ok, [schema, table, [{col, filter_type, formatted_value}]]}
else
e -> {:error, "Error parsing `filter` params: #{inspect(e)}"}
{:error, msg} ->
{:error, "Error parsing `filter` params: #{msg}"}

e ->
{:error, "Error parsing `filter` params: #{inspect(e)}"}
end

%{"schema" => schema, "table" => table} ->
Expand All @@ -188,4 +199,20 @@ defmodule Extensions.PostgresCdcRls.Subscriptions do
"No subscription params provided. Please provide at least a `schema` or `table` to subscribe to."}
end
end

defp format_filter_value(filter, value) do
case filter do
"in" ->
case Regex.run(~r/^\((.*)\)$/, value) do
nil ->
{:error, "`in` filter value must be wrapped by parentheses"}

[_, new_value] ->
{:ok, "{#{new_value}}"}
end

_ ->
{:ok, value}
end
end
end
16 changes: 1 addition & 15 deletions lib/extensions/postgres_cdc_stream/cdc_stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ defmodule Extensions.PostgresCdcStream do

def start_distributed(%{"region" => region, "id" => tenant} = args) do
fly_region = PostgresCdc.aws_to_fly(region)
launch_node = launch_node(tenant, fly_region, node())
launch_node = PostgresCdc.launch_node(tenant, fly_region, node())

Logger.warning(
"Starting distributed postgres extension #{inspect(lauch_node: launch_node, region: region, fly_region: fly_region)}"
Expand All @@ -77,20 +77,6 @@ defmodule Extensions.PostgresCdcStream do
end
end

def launch_node(tenant, fly_region, default) do
case PostgresCdc.region_nodes(fly_region) do
[_ | _] = regions_nodes ->
member_count = Enum.count(regions_nodes)
index = :erlang.phash2(tenant, member_count)
{_, [node: launch_node]} = Enum.at(regions_nodes, index)
launch_node

_ ->
Logger.warning("Didn't find launch_node, return default #{inspect(default)}")
default
end
end

@spec start(map()) :: :ok | {:error, :already_started | :reserved}
def start(args) do
addrtype =
Expand Down
57 changes: 25 additions & 32 deletions lib/realtime/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.Api do
"""
require Logger

import Ecto.Query, warn: false, only: [from: 2]
import Ecto.Query

alias Realtime.{Repo, Api.Tenant, Api.Extensions, RateCounter, GenCounter}

Expand All @@ -17,14 +17,36 @@ defmodule Realtime.Api do
[%Tenant{}, ...]
"""
def list_tenants do
def list_tenants() do
repo_replica = Repo.replica()

Tenant
|> repo_replica.all()
|> repo_replica.preload(:extensions)
end

def list_tenants(opts) when is_list(opts) do
repo_replica = Repo.replica()

field = Keyword.get(opts, :sort_by, "inserted_at") |> String.to_atom()
external_id = Keyword.get(opts, :search)
limit = Keyword.get(opts, :limit, 50)
order = Keyword.get(opts, :order, "desc") |> String.to_atom()

query =
Tenant
|> order_by({^order, ^field})
|> limit(^limit)

ilike = "#{external_id}%"

query = if external_id, do: query |> where([t], ilike(t.external_id, ^ilike)), else: query

query
|> repo_replica.all()
|> repo_replica.preload(:extensions)
end

@doc """
Gets a single tenant.
Expand Down Expand Up @@ -156,7 +178,7 @@ defmodule Realtime.Api do
end

def preload_counters(%Tenant{} = tenant) do
id = {:limit, :all, tenant.external_id}
id = {:plug, :requests, tenant.external_id}

preload_counters(tenant, id)
end
Expand All @@ -173,33 +195,4 @@ defmodule Realtime.Api do
|> Map.put(:events_per_second_rolling, avg)
|> Map.put(:events_per_second_now, current)
end

def get_tenant_limits(%Tenant{} = tenant) do
limiter_keys = [
{:limit, :all, tenant.external_id},
{:limit, :user_channels, tenant.external_id},
{:limit, :channel_joins, tenant.external_id},
{:limit, :tenant_events, tenant.external_id}
]

nodes = [Node.self() | Node.list()]

nodes
|> Enum.map(fn node ->
Task.Supervisor.async({Realtime.TaskSupervisor, node}, fn ->
for {_key, name, _external_id} = key <- limiter_keys do
{_status, response} = Realtime.GenCounter.get(key)

%{
external_id: tenant.external_id,
node: node,
limiter: name,
counter: response
}
end
end)
end)
|> Task.await_many()
|> List.flatten()
end
end
3 changes: 2 additions & 1 deletion lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ defmodule Realtime.Application do
RealtimeWeb.Endpoint,
RealtimeWeb.Presence,
{Task.Supervisor, name: Realtime.TaskSupervisor},
Realtime.Latency
Realtime.Latency,
Realtime.Telemetry.Logger
] ++ extensions_supervisors

children =
Expand Down
39 changes: 39 additions & 0 deletions lib/realtime/helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,45 @@ defmodule Realtime.Helpers do
end
end

@doc """
Gets a short node name from a node name when a node name looks like `realtime-prod@fdaa:0:cc:a7b:b385:83c3:cfe3:2`
## Examples
iex> node = Node.self()
iex> Realtime.Helpers.short_node_id_from_name(node)
"nohost"
iex> node = :"realtime-prod@fdaa:0:cc:a7b:b385:83c3:cfe3:2"
iex> Realtime.Helpers.short_node_id_from_name(node)
"83c3cfe3"
iex> node = :"pink@127.0.0.1"
iex> Realtime.Helpers.short_node_id_from_name(node)
"127.0.0.1"
iex> node = :"pink@10.0.1.1"
iex> Realtime.Helpers.short_node_id_from_name(node)
"10.0.1.1"
iex> node = :"realtime@host.name.internal"
iex> Realtime.Helpers.short_node_id_from_name(node)
"host.name.internal"
"""

@spec short_node_id_from_name(atom()) :: String.t()
def short_node_id_from_name(name) when is_atom(name) do
[_, host] = name |> Atom.to_string() |> String.split("@", parts: 2)

case String.split(host, ":", parts: 8) do
[_, _, _, _, _, one, two, _] ->
one <> two

_other ->
host
end
end

defp pad(data) do
to_add = 16 - rem(byte_size(data), 16)
data <> :binary.copy(<<to_add>>, to_add)
Expand Down
Loading

0 comments on commit 36c8a4b

Please sign in to comment.