Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ config :gen_rpc,
# This is used for process sanitation purposes so please make sure to set it in a sufficiently high number
async_call_inactivity_timeout: 300_000

config :prom_ex, :storage_adapter, Realtime.PromEx.Store

# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.
import_config "#{Mix.env()}.exs"
14 changes: 12 additions & 2 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ queue_target = Env.get_integer("DB_QUEUE_TARGET", 5000)
queue_interval = Env.get_integer("DB_QUEUE_INTERVAL", 5000)
pool_size = Env.get_integer("DB_POOL_SIZE", 5)
master_region = System.get_env("DB_MASTER_REGION")
region = System.get_env("REGION")

after_connect_query_args =
case System.get_env("DB_AFTER_CONNECT_QUERY") do
Expand Down Expand Up @@ -94,6 +95,14 @@ socket_options =
end
end

[_, node_host] = node() |> Atom.to_string() |> String.split("@")

metrics_tags = %{
region: region,
host: node_host,
id: Realtime.Nodes.short_node_id_from_name(node())
}

config :realtime, Realtime.Repo,
hostname: default_db_host,
username: username,
Expand Down Expand Up @@ -130,7 +139,8 @@ config :realtime,
users_scope_shards: users_scope_shards,
postgres_cdc_scope_shards: postgres_cdc_scope_shards,
regional_broadcasting: regional_broadcasting,
master_region: master_region
master_region: master_region,
metrics_tags: metrics_tags

if config_env() != :test && run_janitor? do
config :realtime,
Expand Down Expand Up @@ -280,7 +290,7 @@ if config_env() != :test do
metrics_blocklist: System.get_env("METRICS_TOKEN_BLOCKLIST", "") |> String.split(","),
metrics_jwt_secret: System.get_env("METRICS_JWT_SECRET"),
db_enc_key: System.get_env("DB_ENC_KEY"),
region: System.get_env("REGION"),
region: region,
prom_poll_rate: Env.get_integer("PROM_POLL_RATE", 5000),
slot_name_suffix: slot_name_suffix
end
Expand Down
3 changes: 3 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ config :realtime, RealtimeWeb.Endpoint,
http: [port: 4002],
server: true

# that's what config/runtime.exs expects to see as region
System.put_env("REGION", "us-east-1")

config :realtime,
regional_broadcasting: true,
region: "us-east-1",
Expand Down
1 change: 0 additions & 1 deletion lib/realtime/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ defmodule Realtime.Application do
{Realtime.SignalHandler, %{handler_mod: :erl_signal_handler}}
)

Realtime.PromEx.set_metrics_tags()
:ets.new(Realtime.Tenants.Connect, [:named_table, :set, :public])
:syn.set_event_handler(Realtime.SynHandler)
:ok = :syn.add_node_to_scopes([RegionNodes, Realtime.Tenants.Connect | Realtime.UsersCounter.scopes()])
Expand Down
36 changes: 13 additions & 23 deletions lib/realtime/metrics_cleaner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ defmodule Realtime.MetricsCleaner do
def handle_info(:check, %{interval: interval} = state) do
Process.cancel_timer(state.check_ref)

{exec_time, _} = :timer.tc(fn -> loop_and_cleanup_metrics_table() end)
{exec_time, _} = :timer.tc(fn -> loop_and_cleanup_metrics_table() end, :millisecond)

if exec_time > :timer.seconds(5),
do: Logger.warning("Metrics check took: #{exec_time} ms")
Expand All @@ -31,31 +31,21 @@ defmodule Realtime.MetricsCleaner do
{:noreply, state}
end

defp check(interval) do
Process.send_after(self(), :check, interval)
end
defp check(interval), do: Process.send_after(self(), :check, interval)

@peep_filter_spec [{{{:_, %{tenant: :"$1"}}, :_}, [{:is_binary, :"$1"}], [:"$1"]}]

@metrics_table Realtime.PromEx.Metrics
@filter_spec [{{{:_, %{tenant: :"$1"}}, :_}, [], [:"$1"]}]
defp loop_and_cleanup_metrics_table do
tenant_ids = Realtime.Tenants.Connect.list_tenants()
tenant_ids = Realtime.Tenants.Connect.list_tenants() |> MapSet.new()

:ets.select(@metrics_table, @filter_spec)
|> Enum.uniq()
|> Enum.reject(fn tenant_id -> tenant_id in tenant_ids end)
|> Enum.each(fn tenant_id -> delete_metric(tenant_id) end)
end
{_, tids} = Peep.Persistent.storage(Realtime.PromEx.Metrics)

@doc """
Deletes all metrics that contain the given tenant or database_host.
"""
@spec delete_metric(String.t()) :: :ok
def delete_metric(tenant) do
:ets.select_delete(@metrics_table, [
{{{:_, %{tenant: tenant}}, :_}, [], [true]},
{{{:_, %{database_host: "db.#{tenant}.supabase.co"}}, :_}, [], [true]}
])

:ok
tids
|> Tuple.to_list()
|> Stream.flat_map(fn tid -> :ets.select(tid, @peep_filter_spec) end)
|> Enum.uniq()
|> Stream.reject(fn tenant_id -> MapSet.member?(tenant_ids, tenant_id) end)
|> Enum.map(fn tenant_id -> %{tenant: tenant_id} end)
|> then(&Peep.prune_tags(Realtime.PromEx.Metrics, &1))
end
end
63 changes: 24 additions & 39 deletions lib/realtime/monitoring/prom_ex.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
defmodule Realtime.PromEx do
alias Realtime.Nodes
alias Realtime.PromEx.Plugins.Channels
alias Realtime.PromEx.Plugins.Distributed
alias Realtime.PromEx.Plugins.GenRpc
Expand Down Expand Up @@ -65,6 +64,29 @@ defmodule Realtime.PromEx do

alias PromEx.Plugins

defmodule Store do
@moduledoc false
# Custom store to set global tags and striped storage

@behaviour PromEx.Storage

@impl true
def scrape(name) do
Peep.get_all_metrics(name)
|> Peep.Prometheus.export()
end

@impl true
def child_spec(name, metrics) do
Peep.child_spec(
name: name,
metrics: metrics,
global_tags: Application.get_env(:realtime, :metrics_tags, %{}),
storage: :striped
)
end
end

@impl true
def plugins do
poll_rate = Application.get_env(:realtime, :prom_poll_rate)
Expand Down Expand Up @@ -105,28 +127,7 @@ defmodule Realtime.PromEx do
end

def get_metrics do
%{
region: region,
node_host: node_host,
short_alloc_id: short_alloc_id
} = get_metrics_tags()

def_tags = "host=\"#{node_host}\",region=\"#{region}\",id=\"#{short_alloc_id}\""

metrics =
PromEx.get_metrics(Realtime.PromEx)
|> String.split("\n")
|> Enum.map_join("\n", fn line ->
case Regex.run(~r/(?!\#)^(\w+)(?:{(.*?)})?\s*(.+)$/, line) do
nil ->
line

[_, key, tags, value] ->
tags = if tags == "", do: def_tags, else: tags <> "," <> def_tags

"#{key}{#{tags}} #{value}"
end
end)
metrics = PromEx.get_metrics(Realtime.PromEx)

Realtime.PromEx.__ets_cron_flusher_name__()
|> PromEx.ETSCronFlusher.defer_ets_flush()
Expand All @@ -140,20 +141,4 @@ defmodule Realtime.PromEx do
get_metrics()
|> :zlib.compress()
end

def set_metrics_tags do
[_, node_host] = node() |> Atom.to_string() |> String.split("@")

metrics_tags = %{
region: Application.get_env(:realtime, :region),
node_host: node_host,
short_alloc_id: Nodes.short_node_id_from_name(node())
}

Application.put_env(:realtime, :metrics_tags, metrics_tags)
end

def get_metrics_tags do
Application.get_env(:realtime, :metrics_tags)
end
end
3 changes: 2 additions & 1 deletion lib/realtime/monitoring/prom_ex/plugins/distributed.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ defmodule Realtime.PromEx.Plugins.Distributed do
measurement: :size,
tags: [:origin_node, :target_node]
)
]
],
detach_on_error: false
)
end

Expand Down
3 changes: 2 additions & 1 deletion lib/realtime/monitoring/prom_ex/plugins/gen_rpc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ defmodule Realtime.PromEx.Plugins.GenRpc do
measurement: :size,
tags: [:origin_node, :target_node]
)
]
],
detach_on_error: false
)
end

Expand Down
3 changes: 2 additions & 1 deletion lib/realtime/monitoring/prom_ex/plugins/osmon.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ defmodule Realtime.PromEx.Plugins.OsMon do
description: "The average system load in the last 15 minutes.",
measurement: :avg15
)
]
],
detach_on_error: false
)
end

Expand Down
16 changes: 9 additions & 7 deletions lib/realtime/monitoring/prom_ex/plugins/phoenix.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ if Code.ensure_loaded?(Phoenix) do
description: "The total open connections to ranch.",
measurement: :active
)
]
],
detach_on_error: false
)
end

Expand All @@ -66,6 +67,11 @@ if Code.ensure_loaded?(Phoenix) do
:telemetry.execute(@event_all_connections, %{active: active_conn}, %{})
end

defmodule Buckets do
@moduledoc false
use Peep.Buckets.Custom, buckets: [10, 100, 500, 1_000, 5_000, 10_000]
end

defp channel_events(metric_prefix) do
Event.build(
:phoenix_channel_event_metrics,
Expand Down Expand Up @@ -94,9 +100,7 @@ if Code.ensure_loaded?(Phoenix) do
event_name: [:phoenix, :channel_handled_in],
measurement: :duration,
description: "The time it takes for the application to respond to channel messages.",
reporter_options: [
buckets: [10, 100, 500, 1_000, 5_000, 10_000]
],
reporter_options: [peep_bucket_calculator: Buckets],
tag_values: fn %{socket: %Socket{endpoint: endpoint}} ->
%{
endpoint: normalize_module_name(endpoint)
Expand All @@ -119,9 +123,7 @@ if Code.ensure_loaded?(Phoenix) do
event_name: [:phoenix, :socket_connected],
measurement: :duration,
description: "The time it takes for the application to establish a socket connection.",
reporter_options: [
buckets: [10, 100, 500, 1_000, 5_000, 10_000]
],
reporter_options: [peep_bucket_calculator: Buckets],
tag_values: fn %{result: result, endpoint: endpoint, transport: transport, serializer: serializer} ->
%{
transport: transport,
Expand Down
Loading