Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Allow Overriding of max_channels_per_client via Environment Variable #866

Merged
merged 20 commits into from
Jun 18, 2024
Merged

fix: Allow Overriding of max_channels_per_client via Environment Variable #866

merged 20 commits into from
Jun 18, 2024

Conversation

barrownicholas
Copy link
Contributor

What kind of change does this PR introduce?

Bug fix: allows overriding the default of 100 for max_channels_per_client on a realtime tenant.

What is the current behavior?

The default is locked at 100 without any ready way to override it.

What is the new behavior?

Allows a user to specify MAX_CHANNELS_PER_CLIENT in the runtime environment to override this value.

Additional context

Fixes #843

Copy link

vercel bot commented May 3, 2024

@barrownicholas is attempting to deploy a commit to the Supabase Team on Vercel.

A member of the Team first needs to authorize it.

@barrownicholas
Copy link
Contributor Author

@filipecabaco when you have time could you take a peak at this? You've been a huge help in the past, sorry to bother, you're just the best point-of-contact I have!!

@barrownicholas barrownicholas changed the title Allow Overriding of max_channels_per_client via Environment Variable fix: Allow Overriding of max_channels_per_client via Environment Variable May 3, 2024
priv/repo/seeds.exs Outdated Show resolved Hide resolved
@barrownicholas
Copy link
Contributor Author

Also, can confirm that this built successfully in Docker on my local machine

@filipecabaco
Copy link
Contributor

👋 will check today sorry for the delay

Copy link
Contributor

@filipecabaco filipecabaco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor changes so it's a bit more streamlined 👍

@barrownicholas
Copy link
Contributor Author

@filipecabaco good idea, just pushed those in 11197b4

Copy link
Contributor

@filipecabaco filipecabaco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change would not take into consideration limits set per tenant.

We should add this as a default value of the tenant start here: https://github.com/supabase/realtime/blob/f828f84834959406561599383da9efca6ff79ce8/lib/realtime/api/tenant.ex#L23C19-L23C52

    field(:max_channels_per_client, :integer, default: Application.get_env(:realtime, :tenant_max_channels_per_client))

I will also probably do the same to all other values as this is a very important change that now became way clearer!

Big thank you for bringing this up as this will has a lot of potential of improving the self hosting experience massively.

config/runtime.exs Outdated Show resolved Hide resolved
config/runtime.exs Outdated Show resolved Hide resolved
lib/realtime_web/channels/realtime_channel.ex Outdated Show resolved Hide resolved
priv/repo/seeds.exs Outdated Show resolved Hide resolved
README.md Outdated Show resolved Hide resolved
barrownicholas and others added 5 commits June 18, 2024 10:08
Co-authored-by: Filipe Cabaço <filipecabaco@gmail.com>
Co-authored-by: Filipe Cabaço <filipecabaco@gmail.com>
Co-authored-by: Filipe Cabaço <filipecabaco@gmail.com>
Co-authored-by: Filipe Cabaço <filipecabaco@gmail.com>
@barrownicholas
Copy link
Contributor Author

@filipecabaco good idea about updating the defaults... I may undertake a larger project and set TENANT_... variables for all the defaults after we wrap this up. Changes should be good for you to review again when you have a sec.

lib/realtime/api/tenant.ex Outdated Show resolved Hide resolved
mix.exs Outdated Show resolved Hide resolved
@filipecabaco
Copy link
Contributor

@filipecabaco good idea about updating the defaults... I may undertake a larger project and set TENANT_... variables for all the defaults after we wrap this up. Changes should be good for you to review again when you have a sec.

that would be awesome! and if you have any other feedback on self hosted issues you found please do ping

@barrownicholas
Copy link
Contributor Author

@filipecabaco good idea about updating the defaults... I may undertake a larger project and set TENANT_... variables for all the defaults after we wrap this up. Changes should be good for you to review again when you have a sec.

that would be awesome! and if you have any other feedback on self hosted issues you found please do ping

Will do then! As soon as this ships, I'll open up a new fork + PR and start that

Co-authored-by: Filipe Cabaço <filipecabaco@gmail.com>
@filipecabaco
Copy link
Contributor

small ci error, could you run mix format and commit?

@barrownicholas
Copy link
Contributor Author

sorry about that, should be good now @filipecabaco

@barrownicholas
Copy link
Contributor Author

@filipecabaco looks like I accidenatally duplicated lines 24-26 in tenant.ex while trying to resolve merge conflicts, which produced the failure in the CI workflow. I think it should be good to go now.

@filipecabaco
Copy link
Contributor

filipecabaco commented Jun 18, 2024

🤦‍♂️

my bad forgot that schema is a compiled thing, tenant.ex should actually do this with a changeset change. Here's a proposal for the change that can easily adapt to the other fields when we need to do it
lib/realtime/api/tenant.ex

defmodule Realtime.Api.Tenant do
  @moduledoc """
  Describes a database/tenant which makes use of the realtime service.
  """
  use Ecto.Schema
  import Ecto.Changeset
  alias Realtime.Api.Extensions
  alias Realtime.Crypto

  @type t :: %__MODULE__{}

  @primary_key {:id, :binary_id, autogenerate: true}
  @foreign_key_type :binary_id
  schema "tenants" do
    field(:name, :string)
    field(:external_id, :string)
    field(:jwt_secret, :string)
    field(:jwt_jwks, :map)
    field(:postgres_cdc_default, :string)
    field(:max_concurrent_users, :integer, default: 200)
    field(:max_events_per_second, :integer, default: 100)
    field(:max_bytes_per_second, :integer, default: 100_000)
    field(:max_channels_per_client, :integer)
    field(:max_joins_per_second, :integer, default: 100)
    field(:suspend, :boolean, default: false)
    field(:events_per_second_rolling, :float, virtual: true)
    field(:events_per_second_now, :integer, virtual: true)
    field(:enable_authorization, :boolean, default: false)

    has_many(:extensions, Realtime.Api.Extensions,
      foreign_key: :tenant_external_id,
      references: :external_id,
      on_delete: :delete_all,
      on_replace: :delete
    )

    timestamps()
  end

  @doc false
  def changeset(tenant, attrs) do
    # TODO: remove after infra update
    extension_key =
      if attrs[:extensions] do
        :extensions
      else
        "extensions"
      end

    attrs =
      if attrs[extension_key] do
        ext =
          Enum.map(attrs[extension_key], fn
            %{"type" => "postgres"} = e -> %{e | "type" => "postgres_cdc_rls"}
            e -> e
          end)

        %{attrs | extension_key => ext}
      else
        attrs
      end

    tenant
    |> cast(attrs, [
      :name,
      :external_id,
      :jwt_secret,
      :jwt_jwks,
      :max_concurrent_users,
      :max_events_per_second,
      :postgres_cdc_default,
      :max_bytes_per_second,
      :max_channels_per_client,
      :max_joins_per_second,
      :suspend,
      :enable_authorization
    ])
    |> validate_required([
      :external_id,
      :jwt_secret
    ])
    |> unique_constraint([:external_id])
    |> encrypt_jwt_secret()
    |> maybe_set_default(:max_channels_per_client, :tenant_max_channels_per_client)
    |> cast_assoc(:extensions, with: &Extensions.changeset/2)
  end

  def maybe_set_default(changeset, property, config_key) do
    has_key? = Map.get(changeset.data, property) || Map.get(changeset.changes, property)

    if has_key? do
      changeset
    else
      put_change(changeset, property, Application.fetch_env!(:realtime, config_key))
    end
  end

  def encrypt_jwt_secret(changeset) do
    update_change(changeset, :jwt_secret, &Crypto.encrypt!/1)
  end
end

and we also need to change the runtime.exs we have at the moment ignores tests:

import Config

config :logflare_logger_backend,
  url: System.get_env("LOGFLARE_LOGGER_BACKEND_URL", "https://api.logflare.app")

app_name = System.get_env("FLY_APP_NAME", "")
default_db_host = System.get_env("DB_HOST", "localhost")
username = System.get_env("DB_USER", "postgres")
password = System.get_env("DB_PASSWORD", "postgres")
database = System.get_env("DB_NAME", "postgres")
port = System.get_env("DB_PORT", "5432")
slot_name_suffix = System.get_env("SLOT_NAME_SUFFIX")

config :realtime,
  tenant_max_channels_per_client:
    System.get_env("TENANT_MAX_CHANNELS_PER_CLIENT", "100") |> String.to_integer()

if config_env() == :prod do
  secret_key_base =
    System.get_env("SECRET_KEY_BASE") ||
      raise """
      environment variable SECRET_KEY_BASE is missing.
      You can generate one by calling: mix phx.gen.secret
      """

  if app_name == "" do
    raise "APP_NAME not available"
  end

  config :realtime, RealtimeWeb.Endpoint,
    server: true,
    url: [host: "#{app_name}.fly.dev", port: 80],
    http: [
      port: String.to_integer(System.get_env("PORT") || "4000"),
      protocol_options: [
        max_header_value_length: String.to_integer(System.get_env("MAX_HEADER_LENGTH") || "4096")
      ],
      transport_options: [
        # max_connection is per connection supervisor
        # num_conns_sups defaults to num_acceptors
        # total conns accepted here is max_connections * num_acceptors
        # ref: https://ninenines.eu/docs/en/ranch/2.0/manual/ranch/
        max_connections: String.to_integer(System.get_env("MAX_CONNECTIONS") || "1000"),
        num_acceptors: String.to_integer(System.get_env("NUM_ACCEPTORS") || "100"),
        # IMPORTANT: support IPv6 addresses
        socket_opts: [:inet6]
      ]
    ],
    check_origin: false,
    secret_key_base: secret_key_base
end

if config_env() != :test do
  platform = if System.get_env("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE", do: :aws, else: :fly

  config :realtime,
    secure_channels: System.get_env("SECURE_CHANNELS", "true") == "true",
    jwt_claim_validators: System.get_env("JWT_CLAIM_VALIDATORS", "{}"),
    api_jwt_secret: System.get_env("API_JWT_SECRET"),
    api_blocklist: System.get_env("API_TOKEN_BLOCKLIST", "") |> String.split(","),
    metrics_blocklist: System.get_env("METRICS_TOKEN_BLOCKLIST", "") |> String.split(","),
    metrics_jwt_secret: System.get_env("METRICS_JWT_SECRET"),
    db_enc_key: System.get_env("DB_ENC_KEY"),
    region: System.get_env("FLY_REGION") || System.get_env("REGION"),
    fly_alloc_id: System.get_env("FLY_ALLOC_ID", ""),
    prom_poll_rate: System.get_env("PROM_POLL_RATE", "5000") |> String.to_integer(),
    platform: platform,
    slot_name_suffix: slot_name_suffix

  queue_target = System.get_env("DB_QUEUE_TARGET", "5000") |> String.to_integer()
  queue_interval = System.get_env("DB_QUEUE_INTERVAL", "5000") |> String.to_integer()

  after_connect_query_args =
    case System.get_env("DB_AFTER_CONNECT_QUERY") do
      nil -> nil
      query -> {Postgrex, :query!, [query, []]}
    end

  config :realtime, Realtime.Repo,
    hostname: default_db_host,
    username: username,
    password: password,
    database: database,
    port: port,
    pool_size: System.get_env("DB_POOL_SIZE", "5") |> String.to_integer(),
    queue_target: queue_target,
    queue_interval: queue_interval,
    parameters: [
      application_name: "supabase_mt_realtime"
    ],
    after_connect: after_connect_query_args

  replica_repos = %{
    Realtime.Repo.Replica.FRA => System.get_env("DB_HOST_REPLICA_FRA", default_db_host),
    Realtime.Repo.Replica.IAD => System.get_env("DB_HOST_REPLICA_IAD", default_db_host),
    Realtime.Repo.Replica.SIN => System.get_env("DB_HOST_REPLICA_SIN", default_db_host),
    Realtime.Repo.Replica.SJC => System.get_env("DB_HOST_REPLICA_SJC", default_db_host),
    Realtime.Repo.Replica.Singapore => System.get_env("DB_HOST_REPLICA_SIN", default_db_host),
    Realtime.Repo.Replica.London => System.get_env("DB_HOST_REPLICA_FRA", default_db_host),
    Realtime.Repo.Replica.NorthVirginia => System.get_env("DB_HOST_REPLICA_IAD", default_db_host),
    Realtime.Repo.Replica.Oregon => System.get_env("DB_HOST_REPLICA_SJC", default_db_host),
    Realtime.Repo.Replica.SanJose => System.get_env("DB_HOST_REPLICA_SJC", default_db_host),
    Realtime.Repo.Replica.Local => default_db_host
  }

  # username, password, database, and port must match primary credentials
  for {replica_repo, hostname} <- replica_repos do
    config :realtime, replica_repo,
      hostname: hostname,
      username: username,
      password: password,
      database: database,
      port: port,
      pool_size: System.get_env("DB_REPLICA_POOL_SIZE", "5") |> String.to_integer(),
      queue_target: queue_target,
      queue_interval: queue_interval,
      parameters: [
        application_name: "supabase_mt_realtime_ro"
      ]
  end
end

default_cluster_strategy =
  config_env()
  |> case do
    :prod -> "DNS"
    _ -> "EPMD"
  end

cluster_topologies =
  System.get_env("CLUSTER_STRATEGIES", default_cluster_strategy)
  |> String.upcase()
  |> String.split(",")
  |> Enum.reduce([], fn strategy, acc ->
    strategy
    |> String.trim()
    |> case do
      "DNS" ->
        [
          fly6pn: [
            strategy: Cluster.Strategy.DNSPoll,
            config: [
              polling_interval: 5_000,
              query: System.get_env("DNS_NODES"),
              node_basename: app_name
            ]
          ]
        ] ++ acc

      "POSTGRES" ->
        version = "#{Application.spec(:realtime)[:vsn]}" |> String.replace(".", "_")

        [
          postgres: [
            strategy: Realtime.Cluster.Strategy.Postgres,
            config: [
              hostname: default_db_host,
              username: username,
              password: password,
              database: database,
              port: port,
              parameters: [
                application_name: "cluster_node_#{node()}"
              ],
              heartbeat_interval: 5_000,
              node_timeout: 15_000,
              channel_name:
                System.get_env("POSTGRES_CLUSTER_CHANNEL_NAME", "realtime_cluster_#{version}")
            ]
          ]
        ] ++ acc

      "EPMD" ->
        [
          dev: [
            strategy: Cluster.Strategy.Epmd,
            config: [
              hosts: [:"orange@127.0.0.1", :"pink@127.0.0.1"]
            ],
            connect: {:net_kernel, :connect_node, []},
            disconnect: {:net_kernel, :disconnect_node, []}
          ]
        ] ++ acc

      _ ->
        acc
    end
  end)

config :libcluster,
  debug: false,
  topologies: cluster_topologies

if System.get_env("LOGS_ENGINE") == "logflare" do
  if !System.get_env("LOGFLARE_API_KEY") or !System.get_env("LOGFLARE_SOURCE_ID") do
    raise """
    Environment variable LOGFLARE_API_KEY or LOGFLARE_SOURCE_ID is missing.
    Check those variables or choose another LOGS_ENGINE.
    """
  end

  config :logger,
    backends: [LogflareLogger.HttpBackend]
end

Tested locally and looks good
Screenshot 2024-06-18 at 19 05 20

@barrownicholas
Copy link
Contributor Author

should actually do this with a changeset change

@filipecabaco just refactored those two files based on those changes

@filipecabaco filipecabaco merged commit 7bf2ba5 into supabase:main Jun 18, 2024
2 of 3 checks passed
@filipecabaco
Copy link
Contributor

merged 🙏 thank you so much for all the patience!

@kiwicopple
Copy link
Member

🎉 This PR is included in version 2.29.10 🎉

The release is available on GitHub release

Your semantic-release bot 📦🚀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

too_many_channels error: "{:error, :too_many_channels}"
3 participants