Skip to content

Commit

Permalink
Make retry strategies cluster-aware (#335)
Browse files Browse the repository at this point in the history
Co-authored-by: Harun Zengin <h.zengin@sonnen.de>
  • Loading branch information
whatyouhide and harunzengin committed Oct 4, 2023
1 parent 4d361f7 commit eec6e1c
Show file tree
Hide file tree
Showing 7 changed files with 511 additions and 114 deletions.
18 changes: 16 additions & 2 deletions lib/xandra.ex
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ defmodule Xandra do
exactly the same as calling `execute(conn, query, params_or_options, [])`.
When `query` is a batch query, successful results will always be `Xandra.Void`
structs.
structs. See `execute/4` for full documentation on all supported options.
When `{:error, error}` is returned, `error` can be either a `Xandra.Error` or
a `Xandra.ConnectionError` struct. See the module documentation for more
Expand All @@ -730,6 +730,9 @@ defmodule Xandra do
timestamp will apply to all the statements in the batch that do not
explicitly specify a timestamp.
See `execute/4` for full documentation on all supported options if `query` is not a batch
query.
## Examples
For examples on executing simple queries or prepared queries, see the
Expand Down Expand Up @@ -949,6 +952,17 @@ defmodule Xandra do
## Options
This function supports any arbitrary option, since Xandra passes those down
to the `Xandra.RetryStrategy` module passed in `:retry_strategy`. However, below
is a list of the options that are specific to Xandra and that Xandra uses when executing
the query. Note that we might *add* options to this list in the future, which could
potentially change the meaning of custom options you use to implement your own retry
strategy, and we wouldn't consider this a breaking change. Because of this, we recommend
*scoping* custom options in your retry strategy module (for example, by prefixing them
with `<my_module>_<option_name`).
Here are the Xandra-specific options:
#{NimbleOptions.docs(@execute_opts_schema)}
## Parameters
Expand Down Expand Up @@ -1177,7 +1191,7 @@ defmodule Xandra do
{xandra_opts, other_opts} = Keyword.split(options, @execute_opts_keys)
options = NimbleOptions.validate!(xandra_opts, @execute_opts_schema) ++ other_opts

RetryStrategy.run_with_retrying(options, fn ->
RetryStrategy.run_on_single_conn(options, fn ->
execute_without_retrying(conn, query, params, options)
end)
end
Expand Down
29 changes: 24 additions & 5 deletions lib/xandra/cluster.ex
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,13 @@ defmodule Xandra.Cluster do
end

def execute(cluster, %Batch{} = batch, options) when is_list(options) do
with_conn_and_retrying(cluster, options, &Xandra.execute(&1, batch, options))
options_without_retry_strategy = Keyword.delete(options, :retry_strategy)

with_conn_and_retrying(
cluster,
options,
&Xandra.execute(&1, batch, options_without_retry_strategy)
)
end

@doc """
Expand All @@ -433,7 +439,13 @@ defmodule Xandra.Cluster do
@spec execute(cluster, Xandra.statement() | Xandra.Prepared.t(), Xandra.values(), keyword) ::
{:ok, Xandra.result()} | {:error, Xandra.error()}
def execute(cluster, query, params, options) do
with_conn_and_retrying(cluster, options, &Xandra.execute(&1, query, params, options))
options_without_retry_strategy = Keyword.delete(options, :retry_strategy)

with_conn_and_retrying(
cluster,
options,
&Xandra.execute(&1, query, params, options_without_retry_strategy)
)
end

@doc """
Expand Down Expand Up @@ -507,13 +519,20 @@ defmodule Xandra.Cluster do
Pool.connected_hosts(cluster)
end

defp with_conn_and_retrying(cluster, options, fun) do
RetryStrategy.run_with_retrying(options, fn -> with_conn(cluster, fun) end)
defp with_conn_and_retrying(cluster, options, fun) when is_function(fun, 1) do
case Pool.checkout(cluster) do
{:error, :empty} ->
action = "checkout from cluster #{inspect(cluster)}"
{:error, ConnectionError.new(action, {:cluster, :not_connected})}

{:ok, connected_hosts} ->
RetryStrategy.run_on_cluster(options, connected_hosts, fun)
end
end

defp with_conn(cluster, fun) do
case Pool.checkout(cluster) do
{:ok, pool} ->
{:ok, [{pool, _host} | _connected_hosts]} ->
fun.(pool)

{:error, :empty} ->
Expand Down
49 changes: 40 additions & 9 deletions lib/xandra/cluster/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ defmodule Xandra.Cluster.Pool do
:gen_statem.stop(pid, reason, timeout)
end

@spec checkout(:gen_statem.server_ref()) :: {:ok, pid()} | {:error, :empty}
@spec checkout(:gen_statem.server_ref()) ::
{:ok, [{pid(), Host.t()}, ...]} | {:error, :empty}
def checkout(pid) do
:gen_statem.call(pid, :checkout)
end
Expand Down Expand Up @@ -348,6 +349,30 @@ defmodule Xandra.Cluster.Pool do
{:keep_state, data}
end

# For testing purposes
def handle_event(:info, {:add_test_hosts, hosts_with_status}, _state, %__MODULE__{} = data) do
data =
Enum.reduce(hosts_with_status, data, fn {%Host{} = host, status}, data_acc ->
data_acc =
update_in(data_acc.load_balancing_state, fn current_state ->
current_state = data_acc.load_balancing_module.host_added(current_state, host)
apply(data_acc.load_balancing_module, :"host_#{status}", [current_state, host])
end)

update_in(
data_acc.peers,
&Map.put(&1, Host.to_peername(host), %{
host: host,
status: status,
pool_pid: Process.spawn(fn -> nil end, []),
pool_ref: make_ref()
})
)
end)

{:keep_state, data}
end

# Sent by the connection itself.
def handle_event(
:info,
Expand Down Expand Up @@ -457,20 +482,26 @@ defmodule Xandra.Cluster.Pool do
data.load_balancing_module.query_plan(lb_state)
end)

# Find the first host in the plan for which we have a pool.
# Find all connected hosts
connected_hosts =
for host <- query_plan,
%{pool_pid: pid, host: host} = Map.get(data.peers, Host.to_peername(host)),
not is_nil(host),
is_pid(pid),
do: {pid, host}

reply =
query_plan
|> Stream.map(fn %Host{} = host -> Map.fetch(data.peers, Host.to_peername(host)) end)
|> Enum.find_value(_default = {:error, :empty}, fn
{:ok, %{pool_pid: pid}} when is_pid(pid) -> {:ok, pid}
_other -> nil
end)
case connected_hosts do
[] -> {:error, :empty}
connected_hosts -> {:ok, connected_hosts}
end

{data, {:reply, from, reply}}
end

defp handle_host_added(%__MODULE__{} = data, %Host{} = host) do
data = update_in(data.load_balancing_state, &data.load_balancing_module.host_added(&1, host))
data =
update_in(data.load_balancing_state, &data.load_balancing_module.host_added(&1, host))

data =
put_in(data.peers[Host.to_peername(host)], %{
Expand Down
Loading

0 comments on commit eec6e1c

Please sign in to comment.