diff --git a/config/runtime.exs b/config/runtime.exs index 3157e40aa..e7c934d42 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -32,6 +32,7 @@ db_ssl_ca_cert = System.get_env("DB_SSL_CA_CERT") queue_target = Env.get_integer("DB_QUEUE_TARGET", 5000) queue_interval = Env.get_integer("DB_QUEUE_INTERVAL", 5000) pool_size = Env.get_integer("DB_POOL_SIZE", 5) +master_region = System.get_env("DB_MASTER_REGION") after_connect_query_args = case System.get_env("DB_AFTER_CONNECT_QUERY") do @@ -105,7 +106,8 @@ config :realtime, Realtime.Repo, parameters: [application_name: "supabase_mt_realtime"], after_connect: after_connect_query_args, socket_options: socket_options, - ssl: ssl_opts + ssl: ssl_opts, + master_region: master_region config :realtime, websocket_max_heap_size: websocket_max_heap_size, diff --git a/config/test.exs b/config/test.exs index f28b6b89e..dca8017df 100644 --- a/config/test.exs +++ b/config/test.exs @@ -25,6 +25,8 @@ for repo <- [ pool: Ecto.Adapters.SQL.Sandbox end +config :realtime, Realtime.Repo, master_region: "us-east-1" + # Running server during tests to run integration tests config :realtime, RealtimeWeb.Endpoint, http: [port: 4002], diff --git a/lib/realtime/repo.ex b/lib/realtime/repo.ex index f3850712a..c68ea2f8e 100644 --- a/lib/realtime/repo.ex +++ b/lib/realtime/repo.ex @@ -3,9 +3,10 @@ defmodule Realtime.Repo do use Ecto.Repo, otp_app: :realtime, - adapter: Ecto.Adapters.Postgres + adapter: Realtime.Repo.RegionAdapter import Ecto.Query + alias Ecto.Adapters.SQL def with_dynamic_repo(config, callback) do default_dynamic_repo = get_dynamic_repo() @@ -226,16 +227,16 @@ defmodule Realtime.Repo do %Ecto.Changeset{data: %{id: id, __struct__: struct}, changes: changes} = changeset changes = Keyword.new(changes) query = from(c in struct, where: c.id == ^id, select: c, update: [set: ^changes]) - {:ok, to_sql(:update_all, query)} + {:ok, SQL.to_sql(:update_all, __MODULE__, query)} end defp run_all_query(conn, query, opts) do - {query, args} = to_sql(:all, query) + {query, args} = SQL.to_sql(:all, __MODULE__, query) run_query_with_trap(conn, query, args, opts) end defp run_delete_query(conn, query) do - {query, args} = to_sql(:delete_all, query) + {query, args} = SQL.to_sql(:delete_all, __MODULE__, query) run_query_with_trap(conn, query, args) end diff --git a/lib/realtime/repo/region_adapter.ex b/lib/realtime/repo/region_adapter.ex new file mode 100644 index 000000000..d546423d5 --- /dev/null +++ b/lib/realtime/repo/region_adapter.ex @@ -0,0 +1,311 @@ +defmodule Realtime.Repo.RegionAdapter do + @moduledoc """ + Adapter that routes calls to the appropriate node based on the region. + """ + @behaviour Ecto.Adapter + @behaviour Ecto.Adapter.Storage + @behaviour Ecto.Adapter.Migration + @behaviour Ecto.Adapter.Queryable + @behaviour Ecto.Adapter.Schema + @behaviour Ecto.Adapter.Transaction + @behaviour Ecto.Adapters.SQL.Connection + + alias Realtime.GenRpc + + @impl Ecto.Adapter + defmacro __before_compile__(_env) do + quote do + end + end + + @impl Ecto.Adapter + def init(opts) do + repo = Keyword.get(opts, :repo, Realtime.Repo) + telemetry_prefix = Keyword.get(opts, :telemetry_prefix, [:realtime, :repo]) + opts = Keyword.put_new(opts, :repo, repo) |> Keyword.put_new(:telemetry_prefix, telemetry_prefix) + + {:ok, child_spec, adapter_meta} = Ecto.Adapters.Postgres.init(opts) + adapter_meta = Map.put(adapter_meta, :remote, target_node()) + + {:ok, child_spec, adapter_meta} + end + + defp target_node() do + region = Application.get_env(:realtime, :region) + master_region = Application.get_env(:realtime, :master_region, region) + + with false <- master_region == region, + {:ok, node} <- Realtime.Nodes.node_from_region(master_region, node()) do + node + else + _ -> node() + end + end + + defp run_on_target(target_node, function, args) when target_node == node() do + apply(Ecto.Adapters.Postgres, function, args) + end + + defp run_on_target(target_node, function, args) do + GenRpc.call(target_node, __MODULE__, function, args, []) + end + + @impl Ecto.Adapter + def ensure_all_started(config, type) do + target_node = target_node() + run_on_target(target_node, :ensure_all_started, [config, type]) + end + + @impl Ecto.Adapter + def checked_out?(adapter_meta) do + if target_node() == node(), + do: Ecto.Adapters.Postgres.checked_out?(adapter_meta), + else: raise("checked_out? is not supported on remote nodes") + end + + @impl Ecto.Adapter + def checkout(adapter_meta, config, function) do + if target_node() == node(), + do: Ecto.Adapters.Postgres.checkout(adapter_meta, config, function), + else: raise("checkout is not supported on remote nodes") + end + + @impl Ecto.Adapter + def dumpers(primitive_type, ecto_type) do + target_node = target_node() + run_on_target(target_node, :dumpers, [primitive_type, ecto_type]) + end + + @impl Ecto.Adapter + def loaders(primitive_type, ecto_type) do + target_node = target_node() + run_on_target(target_node, :loaders, [primitive_type, ecto_type]) + end + + @impl Ecto.Adapter.Queryable + def prepare(operation, query) do + target_node = target_node() + run_on_target(target_node, :prepare, [operation, query]) + end + + @impl Ecto.Adapter.Queryable + def execute(adapter_meta, query_meta, query, params, opts) do + target_node = target_node() + run_on_target(target_node, :execute, [adapter_meta, query_meta, query, params, opts]) + end + + @impl Ecto.Adapter.Queryable + def stream(adapter_meta, query_meta, query, params, opts) do + if target_node() == node(), + do: Ecto.Adapters.Postgres.stream(adapter_meta, query_meta, query, params, opts), + else: raise("stream is not supported on remote nodes") + end + + @impl Ecto.Adapter.Schema + def autogenerate(field_type) do + target_node = target_node() + run_on_target(target_node, :autogenerate, [field_type]) + end + + @impl Ecto.Adapter.Schema + def insert_all(adapter_meta, schema_meta, header, rows, on_conflict, returning, placeholders, opts) do + target_node = target_node() + + run_on_target(target_node, :insert_all, [ + adapter_meta, + schema_meta, + header, + rows, + on_conflict, + returning, + placeholders, + opts + ]) + end + + @impl Ecto.Adapter.Schema + def insert(adapter_meta, schema_meta, params, on_conflict, returning, opts) do + target_node = target_node() + run_on_target(target_node, :insert, [adapter_meta, schema_meta, params, on_conflict, returning, opts]) + end + + @impl Ecto.Adapter.Schema + def update(adapter_meta, schema_meta, fields, params, returning, opts) do + target_node = target_node() + run_on_target(target_node, :update, [adapter_meta, schema_meta, fields, params, returning, opts]) + end + + @impl Ecto.Adapter.Schema + def delete(adapter_meta, schema_meta, params, returning, opts) do + target_node = target_node() + run_on_target(target_node, :delete, [adapter_meta, schema_meta, params, returning, opts]) + end + + @impl Ecto.Adapter.Transaction + def transaction(adapter_meta, opts, fun) do + if target_node() == node(), + do: Ecto.Adapters.Postgres.transaction(adapter_meta, opts, fun), + else: raise("transaction is not supported on remote nodes") + end + + @impl Ecto.Adapter.Transaction + def in_transaction?(adapter_meta) do + if target_node() == node(), + do: Ecto.Adapters.Postgres.in_transaction?(adapter_meta), + else: raise("in_transaction? is not supported on remote nodes") + end + + @impl Ecto.Adapter.Transaction + @spec rollback(Ecto.Adapter.adapter_meta(), term()) :: no_return() + def rollback(adapter_meta, value) do + if target_node() == node(), + do: Ecto.Adapters.Postgres.rollback(adapter_meta, value), + else: raise("rollback is not supported on remote nodes") + end + + @impl Ecto.Adapter.Storage + def storage_up(opts) do + if target_node() == node(), + do: Ecto.Adapters.Postgres.storage_up(opts), + else: raise("storage_up is not supported on remote nodes") + end + + @impl Ecto.Adapter.Storage + def storage_down(opts) do + if target_node() == node(), + do: Ecto.Adapters.Postgres.storage_down(opts), + else: raise("storage_down is not supported on remote nodes") + end + + @impl Ecto.Adapter.Storage + def storage_status(opts) do + if target_node() == node(), + do: Ecto.Adapters.Postgres.storage_status(opts), + else: raise("storage_status is not supported on remote nodes") + end + + @impl Ecto.Adapters.SQL.Connection + def child_spec(opts) do + target_node = target_node() + run_on_target(target_node, :child_spec, [opts]) + end + + @impl Ecto.Adapters.SQL.Connection + def prepare_execute(conn, name, statement, params, opts) do + target_node = target_node() + run_on_target(target_node, :prepare_execute, [conn, name, statement, params, opts]) + end + + @impl Ecto.Adapters.SQL.Connection + def execute(conn, cached, params, opts) do + target_node = target_node() + run_on_target(target_node, :execute, [conn, cached, params, opts]) + end + + @impl Ecto.Adapters.SQL.Connection + def query(conn, statement, params, opts) do + target_node = target_node() + run_on_target(target_node, :query, [conn, statement, params, opts]) + end + + @impl Ecto.Adapters.SQL.Connection + @spec query_many(DBConnection.conn(), iodata(), Ecto.Adapters.SQL.query_params(), Keyword.t()) :: + no_return() + def query_many(conn, statement, params, opts) do + target_node = target_node() + run_on_target(target_node, :query_many, [conn, statement, params, opts]) + end + + @impl Ecto.Adapters.SQL.Connection + def stream(conn, statement, params, opts) do + target_node = target_node() + run_on_target(target_node, :stream, [conn, statement, params, opts]) + end + + @impl Ecto.Adapters.SQL.Connection + def to_constraints(exception, opts) do + target_node = target_node() + run_on_target(target_node, :to_constraints, [exception, opts]) + end + + @impl Ecto.Adapters.SQL.Connection + def all(query, as_prefix \\ []) do + target_node = target_node() + run_on_target(target_node, :all, [query, as_prefix]) + end + + @impl Ecto.Adapters.SQL.Connection + def update_all(query, prefix \\ nil) do + target_node = target_node() + run_on_target(target_node, :update_all, [query, prefix]) + end + + @impl Ecto.Adapters.SQL.Connection + def delete_all(query) do + target_node = target_node() + run_on_target(target_node, :delete_all, [query]) + end + + @impl Ecto.Adapters.SQL.Connection + def insert(prefix, table, header, rows, on_conflict, returning, placeholders) do + target_node = target_node() + run_on_target(target_node, :insert, [prefix, table, header, rows, on_conflict, returning, placeholders]) + end + + @impl Ecto.Adapters.SQL.Connection + def update(prefix, table, fields, filters, returning) do + target_node = target_node() + run_on_target(target_node, :update, [prefix, table, fields, filters, returning]) + end + + @impl Ecto.Adapters.SQL.Connection + def delete(prefix, table, filters, returning) do + target_node = target_node() + run_on_target(target_node, :delete, [prefix, table, filters, returning]) + end + + @impl Ecto.Adapters.SQL.Connection + def explain_query(conn, query, params, opts) do + target_node = target_node() + run_on_target(target_node, :explain_query, [conn, query, params, opts]) + end + + @impl Ecto.Adapters.SQL.Connection + def execute_ddl(command) do + target_node = target_node() + run_on_target(target_node, :execute_ddl, [command]) + end + + @impl Ecto.Adapters.SQL.Connection + def ddl_logs(result) do + target_node = target_node() + run_on_target(target_node, :ddl_logs, [result]) + end + + @impl Ecto.Adapters.SQL.Connection + def table_exists_query(table) do + target_node = target_node() + run_on_target(target_node, :table_exists_query, [table]) + end + + @impl Ecto.Adapter.Migration + def execute_ddl(meta, definition, opts) do + if target_node() == node(), + do: Ecto.Adapters.Postgres.execute_ddl(meta, definition, opts), + else: raise("execute_ddl is not supported on remote nodes") + end + + @impl Ecto.Adapter.Migration + def supports_ddl_transaction? do + if target_node() == node(), + do: Ecto.Adapters.Postgres.supports_ddl_transaction?(), + else: raise("supports_ddl_transaction? is not supported on remote nodes") + end + + @impl Ecto.Adapter.Migration + def lock_for_migrations(meta, opts, fun) do + if target_node() == node(), + do: Ecto.Adapters.Postgres.lock_for_migrations(meta, opts, fun), + else: raise("lock_for_migrations is not supported on remote nodes") + end +end diff --git a/test/realtime/repo/region_adapter_test.exs b/test/realtime/repo/region_adapter_test.exs new file mode 100644 index 000000000..1221dfc7c --- /dev/null +++ b/test/realtime/repo/region_adapter_test.exs @@ -0,0 +1,497 @@ +defmodule Realtime.Repo.RegionAdapterTest do + use ExUnit.Case, async: true + use Mimic + doctest Realtime.Repo.RegionAdapter + alias Realtime.Repo.RegionAdapter + alias Realtime.Nodes + alias Realtime.GenRpc + + setup :set_mimic_global + setup :verify_on_exit! + + describe "init/1" do + test "initializes the adapter with current node when in master region" do + previous_region = Application.get_env(:realtime, :region) + previous_master_region = Application.get_env(:realtime, :master_region) + + on_exit(fn -> + Application.put_env(:realtime, :region, previous_region) + Application.put_env(:realtime, :master_region, previous_master_region) + end) + + Application.put_env(:realtime, :region, "us-east-1") + Application.put_env(:realtime, :master_region, "us-east-1") + {:ok, _child_spec, adapter_meta} = RegionAdapter.init(Application.get_env(:realtime, Realtime.Repo)) + + assert adapter_meta[:remote] == node() + end + + test "initializes the adapter with remote node when in non-master region" do + previous_region = Application.get_env(:realtime, :region) + previous_master_region = Application.get_env(:realtime, :master_region) + + on_exit(fn -> + Application.put_env(:realtime, :region, previous_region) + Application.put_env(:realtime, :master_region, previous_master_region) + end) + + master_region = "ap-southeast-2" + remote_node = :"remote_master_node@127.0.0.1" + + expect(Nodes, :node_from_region, fn ^master_region, _current_node -> + {:ok, remote_node} + end) + + Application.put_env(:realtime, :region, "us-east-1") + Application.put_env(:realtime, :master_region, master_region) + + {:ok, _child_spec, adapter_meta} = RegionAdapter.init(Application.get_env(:realtime, Realtime.Repo)) + + assert adapter_meta[:remote] == remote_node + end + + test "initializes the adapter with current node when master region has no nodes" do + previous_region = Application.get_env(:realtime, :region) + previous_master_region = Application.get_env(:realtime, :master_region) + + on_exit(fn -> + Application.put_env(:realtime, :region, previous_region) + Application.put_env(:realtime, :master_region, previous_master_region) + end) + + Application.put_env(:realtime, :region, "us-east-1") + Application.put_env(:realtime, :master_region, "non-existent-region") + + {:ok, _child_spec, adapter_meta} = RegionAdapter.init(Application.get_env(:realtime, Realtime.Repo)) + + assert adapter_meta[:remote] == node() + end + end + + describe "local operations (in master region)" do + setup do + previous_region = Application.get_env(:realtime, :region) + previous_master_region = Application.get_env(:realtime, :master_region) + + on_exit(fn -> + Application.put_env(:realtime, :region, previous_region) + Application.put_env(:realtime, :master_region, previous_master_region) + end) + + Application.put_env(:realtime, :region, "us-east-1") + Application.put_env(:realtime, :master_region, "us-east-1") + + adapter_meta = %{remote: nil, pid: self()} + %{adapter_meta: adapter_meta} + end + + test "execute calls Postgres adapter directly when in master region", %{adapter_meta: adapter_meta} do + query_meta = %{} + query = %Ecto.Query{} + params = [] + opts = [] + expected_result = {:ok, %{num_rows: 1, rows: [[1]]}} + + reject(GenRpc, :call, 5) + + expect(Ecto.Adapters.Postgres, :execute, fn ^adapter_meta, ^query_meta, ^query, ^params, ^opts -> + expected_result + end) + + result = RegionAdapter.execute(adapter_meta, query_meta, query, params, opts) + assert result == expected_result + end + + test "insert calls Postgres adapter directly when in master region", %{adapter_meta: adapter_meta} do + schema_meta = %{} + params = %{name: "test"} + on_conflict = :nothing + returning = [:id] + opts = [] + expected_result = {:ok, [id: 1]} + + reject(GenRpc, :call, 5) + + expect(Ecto.Adapters.Postgres, :insert, fn ^adapter_meta, + ^schema_meta, + ^params, + ^on_conflict, + ^returning, + ^opts -> + expected_result + end) + + result = RegionAdapter.insert(adapter_meta, schema_meta, params, on_conflict, returning, opts) + assert result == expected_result + end + + test "update calls Postgres adapter directly when in master region", %{adapter_meta: adapter_meta} do + schema_meta = %{} + fields = [:name] + params = %{name: "updated"} + returning = [:id] + opts = [] + expected_result = {:ok, [id: 1]} + + reject(GenRpc, :call, 5) + + expect(Ecto.Adapters.Postgres, :update, fn ^adapter_meta, ^schema_meta, ^fields, ^params, ^returning, ^opts -> + expected_result + end) + + result = RegionAdapter.update(adapter_meta, schema_meta, fields, params, returning, opts) + assert result == expected_result + end + + test "delete calls Postgres adapter directly when in master region", %{adapter_meta: adapter_meta} do + schema_meta = %{} + params = %{id: 1} + returning = [:id] + opts = [] + expected_result = {:ok, [id: 1]} + + reject(GenRpc, :call, 5) + + expect(Ecto.Adapters.Postgres, :delete, fn ^adapter_meta, ^schema_meta, ^params, ^returning, ^opts -> + expected_result + end) + + result = RegionAdapter.delete(adapter_meta, schema_meta, params, returning, opts) + assert result == expected_result + end + + test "transaction calls Postgres adapter directly when in master region", %{adapter_meta: adapter_meta} do + opts = [] + fun = fn -> :ok end + expected_result = {:ok, :ok} + + reject(GenRpc, :call, 5) + + expect(Ecto.Adapters.Postgres, :transaction, fn ^adapter_meta, ^opts, ^fun -> + expected_result + end) + + result = RegionAdapter.transaction(adapter_meta, opts, fun) + assert result == expected_result + end + + test "checked_out? calls Postgres adapter directly when in master region", %{adapter_meta: adapter_meta} do + expected_result = false + + expect(Ecto.Adapters.Postgres, :checked_out?, fn ^adapter_meta -> + expected_result + end) + + result = RegionAdapter.checked_out?(adapter_meta) + assert result == expected_result + end + + test "insert_all calls Postgres adapter directly when in master region", %{adapter_meta: adapter_meta} do + schema_meta = %{} + header = [:name] + rows = [[{:name, "test1"}], [{:name, "test2"}]] + on_conflict = :nothing + returning = [:id] + placeholders = [] + opts = [] + expected_result = {2, nil} + + reject(GenRpc, :call, 5) + + expect(Ecto.Adapters.Postgres, :insert_all, fn ^adapter_meta, + ^schema_meta, + ^header, + ^rows, + ^on_conflict, + ^returning, + ^placeholders, + ^opts -> + expected_result + end) + + result = + RegionAdapter.insert_all(adapter_meta, schema_meta, header, rows, on_conflict, returning, placeholders, opts) + + assert result == expected_result + end + + test "stream calls Postgres adapter directly when in master region", %{adapter_meta: adapter_meta} do + query_meta = %{} + query = %Ecto.Query{} + params = [] + opts = [] + expected_stream = %DBConnection.Stream{} + + reject(GenRpc, :call, 5) + + expect(Ecto.Adapters.Postgres, :stream, fn ^adapter_meta, ^query_meta, ^query, ^params, ^opts -> + expected_stream + end) + + result = RegionAdapter.stream(adapter_meta, query_meta, query, params, opts) + assert result == expected_stream + end + + test "in_transaction? calls Postgres adapter directly when in master region", %{adapter_meta: adapter_meta} do + expected_result = false + + reject(GenRpc, :call, 5) + + expect(Ecto.Adapters.Postgres, :in_transaction?, fn ^adapter_meta -> + expected_result + end) + + result = RegionAdapter.in_transaction?(adapter_meta) + assert result == expected_result + end + + test "checkout calls Postgres adapter directly when in master region", %{adapter_meta: adapter_meta} do + config = [] + function = fn -> :ok end + expected_result = :ok + + expect(Ecto.Adapters.Postgres, :checkout, fn ^adapter_meta, ^config, ^function -> + expected_result + end) + + result = RegionAdapter.checkout(adapter_meta, config, function) + assert result == expected_result + end + end + + describe "remote operations (in non-master region)" do + setup do + previous_region = Application.get_env(:realtime, :region) + previous_master_region = Application.get_env(:realtime, :master_region) + + on_exit(fn -> + Application.put_env(:realtime, :region, previous_region) + Application.put_env(:realtime, :master_region, previous_master_region) + end) + + master_region = "ap-southeast-2" + remote_node = :"remote_master_node@127.0.0.1" + + Application.put_env(:realtime, :region, "us-east-1") + Application.put_env(:realtime, :master_region, master_region) + + stub(Nodes, :node_from_region, fn ^master_region, _current_node -> + {:ok, remote_node} + end) + + adapter_meta = %{remote: remote_node, pid: self()} + %{adapter_meta: adapter_meta, remote_node: remote_node} + end + + test "execute routes via GenRpc when in non-master region", %{adapter_meta: adapter_meta, remote_node: remote_node} do + query_meta = %{} + query = %Ecto.Query{} + params = [] + opts = [] + expected_result = {:ok, %{num_rows: 1, rows: [[1]]}} + + expect(GenRpc, :call, fn ^remote_node, + Realtime.Repo.RegionAdapter, + :execute, + [^adapter_meta, ^query_meta, ^query, ^params, ^opts], + [] -> + expected_result + end) + + result = RegionAdapter.execute(adapter_meta, query_meta, query, params, opts) + assert result == expected_result + end + + test "insert routes via GenRpc when in non-master region", %{adapter_meta: adapter_meta, remote_node: remote_node} do + schema_meta = %{} + params = %{name: "test"} + on_conflict = :nothing + returning = [:id] + opts = [] + expected_result = {:ok, %{id: 1, name: "test"}} + + expect(GenRpc, :call, fn ^remote_node, + Realtime.Repo.RegionAdapter, + :insert, + [^adapter_meta, ^schema_meta, ^params, ^on_conflict, ^returning, ^opts], + [] -> + expected_result + end) + + result = RegionAdapter.insert(adapter_meta, schema_meta, params, on_conflict, returning, opts) + assert result == expected_result + end + + test "update routes via GenRpc when in non-master region", %{adapter_meta: adapter_meta, remote_node: remote_node} do + schema_meta = %{} + fields = [:name] + params = %{name: "updated"} + returning = [:id] + opts = [] + expected_result = {:ok, %{id: 1, name: "updated"}} + + expect(GenRpc, :call, fn ^remote_node, + Realtime.Repo.RegionAdapter, + :update, + [^adapter_meta, ^schema_meta, ^fields, ^params, ^returning, ^opts], + [] -> + expected_result + end) + + result = RegionAdapter.update(adapter_meta, schema_meta, fields, params, returning, opts) + assert result == expected_result + end + + test "delete routes via GenRpc when in non-master region", %{adapter_meta: adapter_meta, remote_node: remote_node} do + schema_meta = %{} + params = %{id: 1} + returning = [:id] + opts = [] + expected_result = {:ok, %{id: 1}} + + expect(GenRpc, :call, fn ^remote_node, + Realtime.Repo.RegionAdapter, + :delete, + [^adapter_meta, ^schema_meta, ^params, ^returning, ^opts], + [] -> + expected_result + end) + + result = RegionAdapter.delete(adapter_meta, schema_meta, params, returning, opts) + assert result == expected_result + end + + test "insert_all routes via GenRpc when in non-master region", %{ + adapter_meta: adapter_meta, + remote_node: remote_node + } do + schema_meta = %{} + header = [:name] + rows = [[:name], ["test"]] + on_conflict = :nothing + returning = [:id] + placeholders = [] + opts = [] + expected_result = {:ok, 1} + + expect(GenRpc, :call, fn ^remote_node, + Realtime.Repo.RegionAdapter, + :insert_all, + [ + ^adapter_meta, + ^schema_meta, + ^header, + ^rows, + ^on_conflict, + ^returning, + ^placeholders, + ^opts + ], + [] -> + expected_result + end) + + result = + RegionAdapter.insert_all(adapter_meta, schema_meta, header, rows, on_conflict, returning, placeholders, opts) + + assert result == expected_result + end + + test "stream raises error when in non-master region", %{adapter_meta: adapter_meta} do + query_meta = %{} + query = %Ecto.Query{} + params = [] + opts = [] + + assert_raise RuntimeError, "stream is not supported on remote nodes", fn -> + RegionAdapter.stream(adapter_meta, query_meta, query, params, opts) + end + end + + test "transaction raises error when in non-master region", %{adapter_meta: adapter_meta} do + opts = [] + fun = fn -> :ok end + + assert_raise RuntimeError, "transaction is not supported on remote nodes", fn -> + RegionAdapter.transaction(adapter_meta, opts, fun) + end + end + + test "in_transaction? raises error when in non-master region", %{adapter_meta: adapter_meta} do + assert_raise RuntimeError, "in_transaction? is not supported on remote nodes", fn -> + RegionAdapter.in_transaction?(adapter_meta) + end + end + + test "rollback raises error when in non-master region", %{adapter_meta: adapter_meta} do + value = :some_value + + assert_raise RuntimeError, "rollback is not supported on remote nodes", fn -> + RegionAdapter.rollback(adapter_meta, value) + end + end + + test "checked_out? raises error when in non-master region", %{adapter_meta: adapter_meta} do + assert_raise RuntimeError, "checked_out? is not supported on remote nodes", fn -> + RegionAdapter.checked_out?(adapter_meta) + end + end + + test "checkout raises error when in non-master region", %{adapter_meta: adapter_meta} do + config = [] + function = fn -> :ok end + + assert_raise RuntimeError, "checkout is not supported on remote nodes", fn -> + RegionAdapter.checkout(adapter_meta, config, function) + end + end + + test "storage_up raises error when in non-master region" do + opts = [] + + assert_raise RuntimeError, "storage_up is not supported on remote nodes", fn -> + RegionAdapter.storage_up(opts) + end + end + + test "storage_down raises error when in non-master region" do + opts = [] + + assert_raise RuntimeError, "storage_down is not supported on remote nodes", fn -> + RegionAdapter.storage_down(opts) + end + end + + test "storage_status raises error when in non-master region" do + opts = [] + + assert_raise RuntimeError, "storage_status is not supported on remote nodes", fn -> + RegionAdapter.storage_status(opts) + end + end + + test "execute_ddl raises error when in non-master region", %{adapter_meta: adapter_meta} do + definition = {:create, :table, []} + opts = [] + + assert_raise RuntimeError, "execute_ddl is not supported on remote nodes", fn -> + RegionAdapter.execute_ddl(adapter_meta, definition, opts) + end + end + + test "supports_ddl_transaction? raises error when in non-master region" do + assert_raise RuntimeError, "supports_ddl_transaction? is not supported on remote nodes", fn -> + RegionAdapter.supports_ddl_transaction?() + end + end + + test "lock_for_migrations raises error when in non-master region", %{adapter_meta: adapter_meta} do + opts = [] + fun = fn -> :ok end + + assert_raise RuntimeError, "lock_for_migrations is not supported on remote nodes", fn -> + RegionAdapter.lock_for_migrations(adapter_meta, opts, fun) + end + end + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs index 1d0f4d3ad..6332515be 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -47,6 +47,7 @@ end) Ecto.Adapters.SQL.Sandbox.mode(Realtime.Repo, :manual) Mimic.copy(:syn) +Mimic.copy(Ecto.Adapters.Postgres) Mimic.copy(Extensions.PostgresCdcRls.Replications) Mimic.copy(Extensions.PostgresCdcRls.Subscriptions) Mimic.copy(Realtime.Database)