From b54ab1d5e5cb07efe31c9b6e89715ce69aa3c871 Mon Sep 17 00:00:00 2001 From: Aaron Seo Date: Wed, 29 Jan 2025 11:10:52 -0800 Subject: [PATCH 1/3] Add force checkpoint functions for quorum queues and command line tool --- deps/rabbit/src/rabbit_quorum_queue.erl | 35 ++++++ deps/rabbit/test/quorum_queue_SUITE.erl | 76 +++++++++++++ .../commands/force_checkpoint_command.ex | 107 ++++++++++++++++++ .../queues/force_checkpoint_command_test.exs | 64 +++++++++++ 4 files changed, 282 insertions(+) create mode 100644 deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/force_checkpoint_command.ex create mode 100644 deps/rabbitmq_cli/test/queues/force_checkpoint_command_test.exs diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index c9fb877b38dc..f62c67dea6ba 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -77,6 +77,8 @@ force_vhost_queues_shrink_member_to_current_member/1, force_all_queues_shrink_member_to_current_member/0]). +-export([force_checkpoint/2, force_checkpoint_on_queue/1]). + %% for backwards compatibility -export([file_handle_leader_reservation/1, file_handle_other_reservation/0, @@ -2084,6 +2086,39 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis rabbit_log:warning("Shrinking finished"), ok. +force_checkpoint_on_queue(QName) -> + Node = node(), + QNameFmt = rabbit_misc:rs(QName), + case rabbit_amqqueue:lookup(QName) of + {ok, Q} when ?amqqueue_is_classic(Q) -> + {error, classic_queue_not_supported}; + {ok, Q} when ?amqqueue_is_quorum(Q) -> + {RaName, _} = amqqueue:get_pid(Q), + rpc:call(Node, ra, cast_aux_command, [{RaName, Node}, force_checkpoint]), + rabbit_log:debug("Sent command to force checkpoint ~ts", [QNameFmt]); + {ok, _Q} -> + {error, not_quorum_queue}; + {error, _} = E -> + E + end. + +force_checkpoint(VhostSpec, QueueSpec) -> + [begin + QName = amqqueue:get_name(Q), + case force_checkpoint_on_queue(QName) of + ok -> + {QName, {ok}}; + {error, Err} -> + rabbit_log:warning("~ts: failed to force checkpoint, error: ~w", + [rabbit_misc:rs(QName), Err]), + {QName, {error, Err}} + end + end + || Q <- rabbit_amqqueue:list(), + amqqueue:get_type(Q) == ?MODULE, + is_match(amqqueue:get_vhost(Q), VhostSpec) + andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)]. + is_minority(All, Up) -> MinQuorum = length(All) div 2 + 1, length(Up) < MinQuorum. diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index fdb0a8c5dd8a..667d8c3ec3a7 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -97,6 +97,8 @@ groups() -> force_shrink_member_to_current_member, force_all_queues_shrink_member_to_current_member, force_vhost_queues_shrink_member_to_current_member, + force_checkpoint_on_queue, + force_checkpoint, policy_repair, gh_12635, replica_states @@ -1308,6 +1310,80 @@ force_vhost_queues_shrink_member_to_current_member(Config) -> ?assertEqual(3, length(Nodes0)) end || Q <- QQs, VHost <- VHosts]. +force_checkpoint_on_queue(Config) -> + [Server0, _Server1, _Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + QQ = ?config(queue_name, Config), + RaName = ra_name(QQ), + QName = rabbit_misc:r(<<"/">>, queue, QQ), + + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + rabbit_ct_client_helpers:publish(Ch, QQ, 3), + wait_for_messages_ready([Server0], RaName, 3), + + % Wait for initial checkpoint and make sure it's 0; checkpoint hasn't been triggered yet. + rabbit_ct_helpers:await_condition( + fun() -> + {ok, #{aux := Aux1}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]), + {aux_v3, _, _, _, _, _, _, {checkpoint, Index, _, _, _, _, _}} = Aux1, + case Index of + 0 -> true; + _ -> false + end + end), + + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_checkpoint_on_queue, [QName]), + + % Wait for initial checkpoint and make sure it's not 0 + rabbit_ct_helpers:await_condition( + fun() -> + {ok, #{aux := Aux1}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]), + {aux_v3, _, _, _, _, _, _, {checkpoint, Index, _, _, _, _, _}} = Aux1, + case Index of + 0 -> false; + _ -> true + end + end). + +force_checkpoint(Config) -> + [Server0, _Server1, _Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + QQ = ?config(queue_name, Config), + CQ = <<"force_checkpoint_cq">>, + RaName = ra_name(QQ), + + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + ?assertEqual({'queue.declare_ok', CQ, 0, 0}, + declare(Ch, CQ, [{<<"x-queue-type">>, longstr, <<"classic">>}])), + + rabbit_ct_client_helpers:publish(Ch, QQ, 3), + wait_for_messages_ready([Server0], RaName, 3), + + meck:expect(rabbit_quorum_queue, force_checkpoint_on_queue, fun(Q) -> ok end), + + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + force_checkpoint, [<<".*">>, <<".*">>]), + + % Waiting here to make sure checkpoint has been forced + rabbit_ct_helpers:await_condition( + fun() -> + {ok, #{aux := Aux1}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]), + {aux_v3, _, _, _, _, _, _, {checkpoint, Index, _, _, _, _, _}} = Aux1, + case Index of + 0 -> false; + _ -> true + end + end), + + % Make sure force_checkpoint_on_queue was only called for the quorun queue + ?assertEqual(1, meck:num_calls(rabbit_quorum_queue, force_checkpoint_on_queue, '_')). % Tests that, if the process of a QQ is dead in the moment of declaring a policy % that affects such queue, when the process is made available again, the policy diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/force_checkpoint_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/force_checkpoint_command.ex new file mode 100644 index 000000000000..47ed966f2fcd --- /dev/null +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/force_checkpoint_command.ex @@ -0,0 +1,107 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand do + alias RabbitMQ.CLI.Core.{DocGuide} + + @behaviour RabbitMQ.CLI.CommandBehaviour + + defp default_opts, + do: %{vhost_pattern: ".*", queue_pattern: ".*", errors_only: false} + + def switches(), + do: [ + vhost_pattern: :string, + queue_pattern: :string, + errors_only: :boolean + ] + + def merge_defaults(args, opts) do + {args, Map.merge(default_opts(), opts)} + end + + use RabbitMQ.CLI.Core.RequiresRabbitAppRunning + use RabbitMQ.CLI.Core.AcceptsNoPositionalArguments + + def run([], %{ + node: node_name, + vhost_pattern: vhost_pat, + queue_pattern: queue_pat, + errors_only: errors_only + }) do + args = [vhost_pat, queue_pat] + + case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :force_checkpoint, args) do + {:error, _} = error -> + error + + {:badrpc, _} = error -> + error + + results when errors_only -> + for {{:resource, vhost, _kind, name}, {:error, _, _} = res} <- results, + do: [ + {:vhost, vhost}, + {:name, name}, + {:result, format_result(res)} + ] + + results -> + for {{:resource, vhost, _kind, name}, res} <- results, + do: [ + {:vhost, vhost}, + {:name, name}, + {:result, format_result(res)} + ] + end + end + + use RabbitMQ.CLI.DefaultOutput + + def formatter(), do: RabbitMQ.CLI.Formatters.Table + + def usage, + do: "force_checkpoint [--vhost-pattern ] [--queue-pattern ]" + + def usage_additional do + [ + ["--queue-pattern ", "regular expression to match queue names"], + ["--vhost-pattern ", "regular expression to match virtual host names"], + ["--errors-only", "only list queues which reported an error"] + ] + end + + def usage_doc_guides() do + [ + DocGuide.quorum_queues() + ] + end + + def help_section, do: :replication + + def description, + do: "Forces checkpoints for all matching quorum queues" + + def banner([], _) do + "Forcing checkpoint for all matching quorum queues..." + end + + # + # Implementation + # + + defp format_result({:ok}) do + "ok" + end + + defp format_result({:error, :timeout}) do + "error: the operation timed out and may not have been completed" + end + + defp format_result({:error, err}) do + to_string(:io_lib.format("error: ~W", [err, 10])) + end +end diff --git a/deps/rabbitmq_cli/test/queues/force_checkpoint_command_test.exs b/deps/rabbitmq_cli/test/queues/force_checkpoint_command_test.exs new file mode 100644 index 000000000000..67c2ac38552e --- /dev/null +++ b/deps/rabbitmq_cli/test/queues/force_checkpoint_command_test.exs @@ -0,0 +1,64 @@ +## This Source Code Form is subject to the terms of the Mozilla Public +## License, v. 2.0. If a copy of the MPL was not distributed with this +## file, You can obtain one at https://mozilla.org/MPL/2.0/. +## +## Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. + +defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommandTest do + use ExUnit.Case, async: false + import TestHelper + + @command RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand + + setup_all do + RabbitMQ.CLI.Core.Distribution.start() + + :ok + end + + setup context do + {:ok, + opts: %{ + node: get_rabbit_hostname(), + timeout: context[:test_timeout] || 30000, + vhost_pattern: ".*", + queue_pattern: ".*", + errors_only: false + }} + end + + test "merge_defaults: defaults to reporting complete results" do + assert @command.merge_defaults([], %{}) == + {[], + %{ + vhost_pattern: ".*", + queue_pattern: ".*", + errors_only: false + }} + end + + test "validate: accepts no positional arguments" do + assert @command.validate([], %{}) == :ok + end + + test "validate: any positional arguments fail validation" do + assert @command.validate(["quorum-queue-a"], %{}) == {:validation_failure, :too_many_args} + + assert @command.validate(["quorum-queue-a", "two"], %{}) == + {:validation_failure, :too_many_args} + + assert @command.validate(["quorum-queue-a", "two", "three"], %{}) == + {:validation_failure, :too_many_args} + end + + @tag test_timeout: 3000 + test "run: targeting an unreachable node throws a badrpc", context do + assert match?( + {:badrpc, _}, + @command.run( + [], + Map.merge(context[:opts], %{node: :jake@thedog}) + ) + ) + end +end From 12bf3e094eceb7ad037faadb7bca1cc8c57e43bb Mon Sep 17 00:00:00 2001 From: Aaron Seo Date: Tue, 4 Feb 2025 14:06:29 -0800 Subject: [PATCH 2/3] Fix force_checkpoint tests and CLI command --- deps/rabbit/src/rabbit_fifo.erl | 20 --------- deps/rabbit/src/rabbit_fifo.hrl | 20 +++++++++ deps/rabbit/src/rabbit_quorum_queue.erl | 10 ++--- deps/rabbit/test/quorum_queue_SUITE.erl | 42 ++++++------------- .../commands/force_checkpoint_command.ex | 23 +--------- 5 files changed, 39 insertions(+), 76 deletions(-) diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 7fd616245532..48fc3f9aad2d 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -925,26 +925,6 @@ which_module(3) -> rabbit_fifo_v3; which_module(4) -> ?MODULE; which_module(5) -> ?MODULE. --define(AUX, aux_v3). - --record(checkpoint, {index :: ra:index(), - timestamp :: milliseconds(), - smallest_index :: undefined | ra:index(), - messages_total :: non_neg_integer(), - indexes = ?CHECK_MIN_INDEXES :: non_neg_integer(), - unused_1 = ?NIL}). --record(aux_gc, {last_raft_idx = 0 :: ra:index()}). --record(aux, {name :: atom(), - capacity :: term(), - gc = #aux_gc{} :: #aux_gc{}}). --record(?AUX, {name :: atom(), - last_decorators_state :: term(), - capacity :: term(), - gc = #aux_gc{} :: #aux_gc{}, - tick_pid :: undefined | pid(), - cache = #{} :: map(), - last_checkpoint :: #checkpoint{}}). - init_aux(Name) when is_atom(Name) -> %% TODO: catch specific exception throw if table already exists ok = ra_machine_ets:create_table(rabbit_fifo_usage, diff --git a/deps/rabbit/src/rabbit_fifo.hrl b/deps/rabbit/src/rabbit_fifo.hrl index c74740149925..32f8f19eb642 100644 --- a/deps/rabbit/src/rabbit_fifo.hrl +++ b/deps/rabbit/src/rabbit_fifo.hrl @@ -227,3 +227,23 @@ msg_ttl => non_neg_integer(), created => non_neg_integer() }. + +-define(AUX, aux_v3). + +-record(checkpoint, {index :: ra:index(), + timestamp :: milliseconds(), + smallest_index :: undefined | ra:index(), + messages_total :: non_neg_integer(), + indexes = ?CHECK_MIN_INDEXES :: non_neg_integer(), + unused_1 = ?NIL}). +-record(aux_gc, {last_raft_idx = 0 :: ra:index()}). +-record(aux, {name :: atom(), + capacity :: term(), + gc = #aux_gc{} :: #aux_gc{}}). +-record(?AUX, {name :: atom(), + last_decorators_state :: term(), + capacity :: term(), + gc = #aux_gc{} :: #aux_gc{}, + tick_pid :: undefined | pid(), + cache = #{} :: map(), + last_checkpoint :: #checkpoint{}}). diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index f62c67dea6ba..c8e7f79b7652 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -2089,13 +2089,13 @@ force_all_queues_shrink_member_to_current_member(ListQQFun) when is_function(Lis force_checkpoint_on_queue(QName) -> Node = node(), QNameFmt = rabbit_misc:rs(QName), - case rabbit_amqqueue:lookup(QName) of + case rabbit_db_queue:get_durable(QName) of {ok, Q} when ?amqqueue_is_classic(Q) -> {error, classic_queue_not_supported}; {ok, Q} when ?amqqueue_is_quorum(Q) -> {RaName, _} = amqqueue:get_pid(Q), - rpc:call(Node, ra, cast_aux_command, [{RaName, Node}, force_checkpoint]), - rabbit_log:debug("Sent command to force checkpoint ~ts", [QNameFmt]); + rabbit_log:debug("Sending command to force ~ts to take a checkpoint", [QNameFmt]), + rpc:call(Node, ra, cast_aux_command, [{RaName, Node}, force_checkpoint]); {ok, _Q} -> {error, not_quorum_queue}; {error, _} = E -> @@ -2114,8 +2114,7 @@ force_checkpoint(VhostSpec, QueueSpec) -> {QName, {error, Err}} end end - || Q <- rabbit_amqqueue:list(), - amqqueue:get_type(Q) == ?MODULE, + || Q <- rabbit_db_queue:get_all_durable_by_type(?MODULE), is_match(amqqueue:get_vhost(Q), VhostSpec) andalso is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec)]. @@ -2179,4 +2178,3 @@ file_handle_other_reservation() -> file_handle_release_reservation() -> ok. - diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 667d8c3ec3a7..324bfa306be3 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -10,6 +10,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). -include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). +-include_lib("rabbit/src/rabbit_fifo.hrl"). -import(queue_utils, [wait_for_messages_ready/3, wait_for_messages_pending_ack/3, @@ -1327,12 +1328,9 @@ force_checkpoint_on_queue(Config) -> % Wait for initial checkpoint and make sure it's 0; checkpoint hasn't been triggered yet. rabbit_ct_helpers:await_condition( fun() -> - {ok, #{aux := Aux1}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]), - {aux_v3, _, _, _, _, _, _, {checkpoint, Index, _, _, _, _, _}} = Aux1, - case Index of - 0 -> true; - _ -> false - end + {ok, #{aux := Aux}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]), + #aux_v3{last_checkpoint = #checkpoint{index = Index}} = Aux, + Index =:= 0 end), rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, @@ -1341,12 +1339,9 @@ force_checkpoint_on_queue(Config) -> % Wait for initial checkpoint and make sure it's not 0 rabbit_ct_helpers:await_condition( fun() -> - {ok, #{aux := Aux1}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]), - {aux_v3, _, _, _, _, _, _, {checkpoint, Index, _, _, _, _, _}} = Aux1, - case Index of - 0 -> false; - _ -> true - end + {ok, #{aux := Aux}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]), + #aux_v3{last_checkpoint = #checkpoint{index = Index}} = Aux, + Index =/= 0 end). force_checkpoint(Config) -> @@ -1354,6 +1349,7 @@ force_checkpoint(Config) -> rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), QQ = ?config(queue_name, Config), + QQName = rabbit_misc:r(<<"/">>, queue, QQ), CQ = <<"force_checkpoint_cq">>, RaName = ra_name(QQ), @@ -1366,24 +1362,12 @@ force_checkpoint(Config) -> rabbit_ct_client_helpers:publish(Ch, QQ, 3), wait_for_messages_ready([Server0], RaName, 3), - meck:expect(rabbit_quorum_queue, force_checkpoint_on_queue, fun(Q) -> ok end), - - rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, + ForceCheckpointRes = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_quorum_queue, force_checkpoint, [<<".*">>, <<".*">>]), - - % Waiting here to make sure checkpoint has been forced - rabbit_ct_helpers:await_condition( - fun() -> - {ok, #{aux := Aux1}, _} = rpc:call(Server0, ra, member_overview, [{RaName, Server0}]), - {aux_v3, _, _, _, _, _, _, {checkpoint, Index, _, _, _, _, _}} = Aux1, - case Index of - 0 -> false; - _ -> true - end - end), - - % Make sure force_checkpoint_on_queue was only called for the quorun queue - ?assertEqual(1, meck:num_calls(rabbit_quorum_queue, force_checkpoint_on_queue, '_')). + ExpectedRes = [{QQName, {ok}}], + + % Result should only have quorum queue + ?assertEqual(ExpectedRes, ForceCheckpointRes). % Tests that, if the process of a QQ is dead in the moment of declaring a policy % that affects such queue, when the process is made available again, the policy diff --git a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/force_checkpoint_command.ex b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/force_checkpoint_command.ex index 47ed966f2fcd..bdc587fc83bb 100644 --- a/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/force_checkpoint_command.ex +++ b/deps/rabbitmq_cli/lib/rabbitmq/cli/queues/commands/force_checkpoint_command.ex @@ -35,9 +35,6 @@ defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand do args = [vhost_pat, queue_pat] case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :force_checkpoint, args) do - {:error, _} = error -> - error - {:badrpc, _} = error -> error @@ -46,7 +43,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand do do: [ {:vhost, vhost}, {:name, name}, - {:result, format_result(res)} + {:result, res} ] results -> @@ -54,7 +51,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand do do: [ {:vhost, vhost}, {:name, name}, - {:result, format_result(res)} + {:result, res} ] end end @@ -88,20 +85,4 @@ defmodule RabbitMQ.CLI.Queues.Commands.ForceCheckpointCommand do def banner([], _) do "Forcing checkpoint for all matching quorum queues..." end - - # - # Implementation - # - - defp format_result({:ok}) do - "ok" - end - - defp format_result({:error, :timeout}) do - "error: the operation timed out and may not have been completed" - end - - defp format_result({:error, err}) do - to_string(:io_lib.format("error: ~W", [err, 10])) - end end From 4439150e50b245f4523f87d08ae262065d9487f5 Mon Sep 17 00:00:00 2001 From: Aaron Seo Date: Tue, 4 Feb 2025 15:44:44 -0800 Subject: [PATCH 3/3] Add timeout to rpc call for force_checkpoint --- deps/rabbit/src/rabbit_quorum_queue.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index c8e7f79b7652..e33ebfba8bca 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -140,6 +140,7 @@ -define(RPC_TIMEOUT, 1000). -define(START_CLUSTER_TIMEOUT, 5000). -define(START_CLUSTER_RPC_TIMEOUT, 60_000). %% needs to be longer than START_CLUSTER_TIMEOUT +-define(FORCE_CHECKPOINT_RPC_TIMEOUT, 15_000). -define(TICK_INTERVAL, 5000). %% the ra server tick time -define(DELETE_TIMEOUT, 5000). -define(MEMBER_CHANGE_TIMEOUT, 20_000). @@ -2095,7 +2096,7 @@ force_checkpoint_on_queue(QName) -> {ok, Q} when ?amqqueue_is_quorum(Q) -> {RaName, _} = amqqueue:get_pid(Q), rabbit_log:debug("Sending command to force ~ts to take a checkpoint", [QNameFmt]), - rpc:call(Node, ra, cast_aux_command, [{RaName, Node}, force_checkpoint]); + rpc:call(Node, ra, cast_aux_command, [{RaName, Node}, force_checkpoint], ?FORCE_CHECKPOINT_RPC_TIMEOUT); {ok, _Q} -> {error, not_quorum_queue}; {error, _} = E ->