Skip to content

Commit

Permalink
fix: add tenant db transactions and stop channel when db error
Browse files Browse the repository at this point in the history
  • Loading branch information
w3b6x9 committed Jun 8, 2022
1 parent 8fed49e commit 533a54e
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 97 deletions.
4 changes: 2 additions & 2 deletions lib/extensions/postgres/postgres.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ defmodule Extensions.Postgres do

{:error, error} ->
Logger.error("Can't start Postgres ext #{inspect(error, pretty: true)}")
{:error, error}
:ok

{:badrpc, error} ->
Logger.error("Bad RPC call #{inspect(error, pretty: true)}")
{:error, error}
:ok
end
end

Expand Down
36 changes: 16 additions & 20 deletions lib/extensions/postgres/postgres_replications.ex
Original file line number Diff line number Diff line change
@@ -1,27 +1,23 @@
defmodule Extensions.Postgres.Replications do
require Logger
import Postgrex, only: [transaction: 2, query: 3, query!: 3]
import Postgrex, only: [query: 3]

def prepare_replication(conn, slot_name) do
transaction(conn, fn conn ->
query!(
conn,
"select
case when not exists (
select 1
from pg_replication_slots
where slot_name = $1
)
then (
select 1 from pg_create_logical_replication_slot($1, 'wal2json', 'true')
)
else 1
end;",
[slot_name]
)

query!(conn, "set search_path = ''", [])
end)
query(
conn,
"select
case when not exists (
select 1
from pg_replication_slots
where slot_name = $1
)
then (
select 1 from pg_create_logical_replication_slot($1, 'wal2json', 'true')
)
else 1
end;",
[slot_name]
)
end

def list_changes(conn, slot_name, publication, max_changes, max_record_bytes) do
Expand Down
33 changes: 14 additions & 19 deletions lib/extensions/postgres/postgres_subscription_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule Extensions.Postgres.SubscriptionManager do
import Realtime.Helpers, only: [cancel_timer: 1]

@check_active_interval 15_000
@check_oids_interval 15_000
@check_oids_interval 60_000
@queue_target 5_000
@pool_size 5
@subscribe_timeout 15_000
Expand Down Expand Up @@ -86,22 +86,22 @@ defmodule Extensions.Postgres.SubscriptionManager do

@impl true
def handle_call(
{:subscribe, opts},
{:subscribe, %{channel_pid: pid, claims: claims, config: config, id: id} = opts},
_,
%{check_active_pids: ref, subscribers_tid: tid, oids: oids} = state
%{check_active_pids: ref, publication: publication, subscribers_tid: tid} = state
) do
Logger.debug("Subscribe #{inspect(opts, pretty: true)}")
pid = opts.channel_pid

subscription_opts = %{
id: opts.id,
config: opts.config,
claims: opts.claims
id: id,
config: config,
claims: claims
}

true = :ets.insert(tid, {pid, opts.id, opts.config, opts.claims, Process.monitor(pid)})
monitor_ref = Process.monitor(pid)
true = :ets.insert(tid, {pid, id, config, claims, monitor_ref})

create_resp = Subscriptions.create(state.conn, subscription_opts, oids)
create_resp = Subscriptions.create(state.conn, publication, subscription_opts)

new_state =
if ref == nil do
Expand All @@ -117,7 +117,7 @@ defmodule Extensions.Postgres.SubscriptionManager do
def handle_call(:subscribers_list, _, state) do
subscribers =
:ets.foldl(
fn {pid, _, _}, acc ->
fn {pid, _, _, _, _}, acc ->
[pid | acc]
end,
[],
Expand All @@ -135,14 +135,9 @@ defmodule Extensions.Postgres.SubscriptionManager do
cancel_timer(ref)

oids =
case Subscriptions.fetch_publication_tables(conn, publication) do
^old_oids ->
old_oids

new_oids ->
Logger.warning("Found new oids #{inspect(new_oids, pretty: true)}")
Subscriptions.update_all(conn, state.subscribers_tid, new_oids)
new_oids
case Subscriptions.update_all(conn, state.subscribers_tid, publication, old_oids) do
{:ok, new_oids} -> new_oids
{:error, _} -> old_oids
end

{:noreply, %{state | oids: oids, check_oid_ref: check_oids()}}
Expand Down Expand Up @@ -176,7 +171,7 @@ defmodule Extensions.Postgres.SubscriptionManager do
nil

_ ->
Logger.error("Detected zombi subscriber")
Logger.error("Detected phantom subscriber")
:ets.delete(tid, pid)
Subscriptions.delete(state.conn, UUID.string_to_binary!(postgres_id))
end
Expand Down
93 changes: 53 additions & 40 deletions lib/extensions/postgres/postgres_subscriptions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,55 @@ defmodule Extensions.Postgres.Subscriptions do
This module consolidates subscriptions handling
"""
require Logger
import Postgrex, only: [transaction: 2, query: 3, query!: 3]
import Postgrex, only: [transaction: 2, query: 3, query!: 3, rollback: 2]

@type tid() :: :ets.tid()
@type conn() :: DBConnection.conn()

@spec create(conn(), String.t(), map()) :: :ok
@spec create(conn(), String.t(), map()) :: {:ok, list(Postgrex.Result.t())} | {:error, any()}
def create(conn, publication, params) do
case fetch_publication_tables(conn, publication) do
oids when oids != %{} ->
if !insert_topic_subscriptions(conn, params, oids) do
Logger.error("Didn't create the subscription #{inspect(params.config)}")
end
transaction(conn, fn conn ->
case fetch_publication_tables(conn, publication) do
oids when oids != %{} ->
case insert_topic_subscriptions(conn, params, oids) do
{:ok, result} ->
result

{:error, error} ->
Logger.error("Didn't create the subscription #{inspect(params.config)}")
rollback(conn, error)
end

other ->
Logger.error("Unacceptable oids #{inspect(other)}")
end
_ ->
rollback(conn, "Entity oids do not exist")
end
end)
end

@spec update_all(conn(), tid(), map()) :: :ok
def update_all(conn, tid, oids) do
delete_all(conn)
@spec update_all(conn(), tid(), String.t(), map()) :: {:ok, map()} | {:error, any()}
def update_all(conn, tid, publication, oids) do
transaction(conn, fn conn ->
new_oids = fetch_publication_tables(conn, publication)

fn {_pid, id, config, claims, _}, _ ->
subscription_opts = %{
id: id,
config: config,
claims: claims
}
if oids != new_oids do
delete_all(conn)

create(conn, subscription_opts, oids)
end
|> :ets.foldl(nil, tid)
fn {_pid, id, config, claims, _}, _ ->
subscription_opts = %{
id: id,
config: config,
claims: claims
}

create(conn, publication, subscription_opts)
end
|> :ets.foldl(nil, tid)

new_oids
else
rollback(conn, "No change in publication entity oids")
end
end)
end

@spec delete(conn(), String.t()) :: any()
Expand Down Expand Up @@ -107,6 +124,7 @@ defmodule Extensions.Postgres.Subscriptions do
|> Map.update({schema}, [oid], &[oid | &1])
|> Map.update({"*"}, [oid], &[oid | &1])
end)
|> Enum.reduce(%{}, fn {k, v}, acc -> Map.put(acc, k, Enum.sort(v)) end)

_ ->
%{}
Expand Down Expand Up @@ -164,12 +182,13 @@ defmodule Extensions.Postgres.Subscriptions do
end
end

@spec insert_topic_subscriptions(conn(), map(), map()) :: boolean()
@spec insert_topic_subscriptions(conn(), map(), map()) ::
{:ok, list(Postgrex.Result.t())} | {:error, any()}
def insert_topic_subscriptions(conn, params, oids) do
transform_to_oid_view(oids, params.config)
|> case do
nil ->
false
{:error, "No match between subscription params and entity oids"}

views ->
bin_uuid = UUID.string_to_binary!(params.id)
Expand All @@ -179,22 +198,16 @@ defmodule Extensions.Postgres.Subscriptions do
on conflict (subscription_id, entity, filters)
do update set claims = excluded.claims, created_at = now()"

Enum.reduce(views, true, fn view, acc ->
{entity, filters} =
case view do
{entity, filters} -> {entity, filters}
entity -> {entity, []}
end

query(conn, sql, [bin_uuid, entity, filters, params.claims])
|> case do
{:error, reason} ->
Logger.error("Insert subscriptions query #{inspect(reason)}")
false

_ ->
acc
end
transaction(conn, fn conn ->
Enum.reduce(views, [], fn view, acc ->
{entity, filters} =
case view do
{entity, filters} -> {entity, filters}
entity -> {entity, []}
end

[query!(conn, sql, [bin_uuid, entity, filters, params.claims]) | acc]
end)
end)
end
end
Expand Down
11 changes: 5 additions & 6 deletions lib/realtime_web/channels/realtime_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -152,24 +152,23 @@ defmodule RealtimeWeb.RealtimeChannel do
Logger.info("Subscribe channel for #{tenant} to #{postgres_topic}")

Process.monitor(manager_pid)
socket

{:noreply, socket}

:ok ->
Logger.warning("Re-subscribe channel for #{tenant}")

ref = Process.send_after(self(), :postgres_subscribe, 5_000)
assign(socket, :pg_sub_ref, ref)

{:noreply, assign(socket, :pg_sub_ref, ref)}

{:error, error} ->
Logger.error(
"Failed to subscribe channel for #{tenant} to #{postgres_topic}: #{inspect(error)}"
)

ref = Process.send_after(self(), :postgres_subscribe, 5_000)
assign(socket, :pg_sub_ref, ref)
{:stop, %{reason: inspect(error)}, socket}
end

{:noreply, new_socket}
end

def handle_info(
Expand Down
21 changes: 15 additions & 6 deletions test/realtime/extensions/postgres/postgres_subscriptions_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -36,43 +36,52 @@ defmodule Realtime.Extensions.PostgresSubscriptionsTest do
describe "insert_topic_subscriptions/3" do
test "success insert into db", %{} do
with_mocks([
{Postgrex, [], [query: fn _, _, _ -> {:ok, :result} end]}
{Postgrex, [], [transaction: fn _, _ -> {:ok, :result} end]}
]) do
params = %{
id: UUID.uuid1(),
config: @realtime_config,
claims: %{}
}

assert match?(true, Subscriptions.insert_topic_subscriptions(:conn, params, @oids))
assert match?(
{:ok, :result},
Subscriptions.insert_topic_subscriptions(:conn, params, @oids)
)
end
end

test "not success insert into db", %{} do
with_mocks([
{Postgrex, [], [query: fn _, _, _ -> {:error, :some_reason} end]}
{Postgrex, [], [transaction: fn _, _ -> {:error, :some_reason} end]}
]) do
params = %{
id: UUID.uuid1(),
config: @realtime_config,
claims: %{}
}

assert match?(false, Subscriptions.insert_topic_subscriptions(:conn, params, @oids))
assert match?(
{:error, :some_reason},
Subscriptions.insert_topic_subscriptions(:conn, params, @oids)
)
end
end

test "user can't listen changes", %{} do
with_mocks([
{Postgrex, [], [query: fn _, _, _ -> {:ok, :result} end]}
{Postgrex, [], [transaction: fn _, _, _ -> {:ok, :result} end]}
]) do
params = %{
id: UUID.uuid1(),
config: @realtime_config_undef_table,
claims: %{}
}

assert match?(false, Subscriptions.insert_topic_subscriptions(:conn, params, @oids))
assert match?(
{:error, "No match between subscription params and entity oids"},
Subscriptions.insert_topic_subscriptions(:conn, params, @oids)
)
end
end
end
Expand Down
9 changes: 5 additions & 4 deletions test/realtime/extensions/postgres/postgres_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ defmodule Realtime.Extensions.PostgresTest do
use RealtimeWeb.ConnCase

import Mock
import Extensions.Postgres.Helpers, only: [filter_postgres_settings: 1]

alias Extensions.Postgres
alias Realtime.Api
alias Realtime.Api.Tenant
alias RealtimeWeb.{ChannelsAuthorization, Joken.CurrentTime, UserSocket}
import Extensions.Postgres.Helpers, only: [filter_postgres_settings: 1]
alias Extensions.Postgres
alias Postgres.SubscriptionManager
alias Postgrex, as: P

@external_id "dev_tenant"
Expand Down Expand Up @@ -72,12 +73,12 @@ defmodule Realtime.Extensions.PostgresTest do

%{conn: conn, oids: oids} = :sys.get_state(subscriber_manager_pid)

P.query(conn, "drop publication supabase_multiplayer", [])
P.query!(conn, "drop publication supabase_realtime", [])
send(subscriber_manager_pid, :check_oids)
%{oids: oids2} = :sys.get_state(subscriber_manager_pid)
assert !Map.equal?(oids, oids2)

P.query(conn, "create publication supabase_multiplayer for all tables", [])
P.query!(conn, "create publication supabase_realtime for all tables", [])
send(subscriber_manager_pid, :check_oids)
%{oids: oids3} = :sys.get_state(subscriber_manager_pid)
assert !Map.equal?(oids2, oids3)
Expand Down

0 comments on commit 533a54e

Please sign in to comment.