Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements Mirror Queue Sync in Batches #344

Merged
merged 32 commits into from Oct 12, 2015
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
f30aaa3
implements BQ:batch_publish and BQ:batch_publish_delivered
videlalvaro Sep 28, 2015
676e413
implement mirror message sync in batches
videlalvaro Sep 28, 2015
5ec328d
adds ha-sync-batch-size policy
videlalvaro Oct 1, 2015
ef2d3f3
retrieves batch size from policy
videlalvaro Oct 1, 2015
701ee99
refactors msg broadcast
videlalvaro Oct 1, 2015
d480e1d
oops
videlalvaro Oct 1, 2015
0e89449
refactors shared logic
videlalvaro Oct 1, 2015
e8fe201
cosmetics
videlalvaro Oct 1, 2015
ccae00a
adds explanation
videlalvaro Oct 1, 2015
5c2d50d
implements batch publishing for mirrored queues
videlalvaro Oct 2, 2015
8076b1b
implements batch publishing for priority queues
videlalvaro Oct 2, 2015
90c0244
adds batch publish tests
videlalvaro Oct 4, 2015
7cdd8c7
fixes failing test
videlalvaro Oct 4, 2015
474520d
fixes retrieving sync_batch_size from policy
videlalvaro Oct 4, 2015
332c389
improves comment about sync batch order
videlalvaro Oct 4, 2015
b813c42
cosmetics
videlalvaro Oct 4, 2015
b6f44d6
fixes arguments passed to batch_publish_*
videlalvaro Oct 4, 2015
7f27f43
off by one error
videlalvaro Oct 4, 2015
e0bb5df
Merge branch 'master' into rabbitmq-server-336
videlalvaro Oct 4, 2015
3776634
improves comment
videlalvaro Oct 5, 2015
215dac3
restores new line
videlalvaro Oct 5, 2015
c863a81
Merge branch 'master' into rabbitmq-server-336
michaelklishin Oct 10, 2015
199d5a9
Wording
michaelklishin Oct 10, 2015
fbc7ff5
removes unused constant
videlalvaro Oct 10, 2015
ed516d4
Clarify this comment
michaelklishin Oct 10, 2015
0646e9d
adds default sync batch size on the app config
videlalvaro Oct 10, 2015
1a0a00c
get into account unacked messages when syncing
videlalvaro Oct 10, 2015
b43b3b9
send msg batch in proper order
videlalvaro Oct 10, 2015
1929533
refactors message sync'ing in batches
videlalvaro Oct 10, 2015
1a08f21
fixes conflicts
videlalvaro Oct 10, 2015
e17a5e2
removes non batch-sync code path
videlalvaro Oct 10, 2015
dc72935
Move this constant closer to the only place it is used
michaelklishin Oct 12, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 15 additions & 2 deletions src/rabbit_backing_queue.erl
Expand Up @@ -33,6 +33,10 @@

-type(flow() :: 'flow' | 'noflow').
-type(msg_ids() :: [rabbit_types:msg_id()]).
-type(publish() :: {rabbit_types:basic_message(),
rabbit_types:message_properties(), boolean()}).
-type(delivered_publish() :: {rabbit_types:basic_message(),
rabbit_types:message_properties()}).
-type(fetch_result(Ack) ::
('empty' | {rabbit_types:basic_message(), boolean(), Ack})).
-type(drop_result(Ack) ::
Expand Down Expand Up @@ -104,6 +108,9 @@
rabbit_types:message_properties(), boolean(), pid(), flow(),
state()) -> state().

%% Like publish/6 but for batches of publishes.
-callback batch_publish([publish()], pid(), flow(), state()) -> state().

%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
%% (i.e. saves the round trip through the backing queue).
Expand All @@ -112,6 +119,11 @@
state())
-> {ack(), state()}.

%% Like publish_delivered/5 but for batches of publishes.
-callback batch_publish_delivered([delivered_publish()], pid(), flow(),
state())
-> {[ack()], state()}.

%% Called to inform the BQ about messages which have reached the
%% queue, but are not going to be further passed to BQ.
-callback discard(rabbit_types:msg_id(), pid(), flow(), state()) -> state().
Expand Down Expand Up @@ -253,8 +265,9 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
{delete_and_terminate, 2}, {delete_crashed, 1}, {purge, 1},
{purge_acks, 1}, {publish, 6},
{publish_delivered, 5}, {discard, 4}, {drain_confirmed, 1},
{purge_acks, 1}, {publish, 6}, {publish_delivered, 5},
{batch_publish, 4}, {batch_publish_delivered, 4},
{discard, 4}, {drain_confirmed, 1},
{dropwhile, 2}, {fetchwhile, 4}, {fetch, 2},
{drop, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
Expand Down
45 changes: 43 additions & 2 deletions src/rabbit_mirror_queue_master.erl
Expand Up @@ -18,6 +18,7 @@

-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, purge_acks/1, publish/6, publish_delivered/5,
batch_publish/4, batch_publish_delivered/4,
discard/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
Expand Down Expand Up @@ -147,13 +148,15 @@ sync_mirrors(HandleInfo, EmitStats,
QName, "Synchronising: " ++ Fmt ++ "~n", Params)
end,
Log("~p messages to synchronise", [BQ:len(BQS)]),
{ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
{ok, #amqqueue{slave_pids = SPids} = Q} = rabbit_amqqueue:lookup(QName),
SyncBatchSize = rabbit_mirror_queue_misc:sync_batch_size(Q),
Log("batch size: ~p", [SyncBatchSize]),
Ref = make_ref(),
Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, QName, Log, SPids),
gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}),
S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
case rabbit_mirror_queue_sync:master_go(
Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) of
Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) of
{shutdown, R, BQS1} -> {stop, R, S(BQS1)};
{sync_died, R, BQS1} -> Log("~p", [R]),
{ok, S(BQS1)};
Expand Down Expand Up @@ -241,6 +244,27 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, Flow,
BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).

batch_publish(Publishes, ChPid, Flow,
State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
{Publishes1, false, MsgSizes} =
lists:foldl(fun ({Msg = #basic_message { id = MsgId },
MsgProps, _IsDelivered}, {Pubs, false, Sizes}) ->
{[{Msg, MsgProps, true} | Pubs], %% [0]
false = dict:is_key(MsgId, SS), %% ASSERTION
Sizes + rabbit_basic:msg_size(Msg)}
end, {[], false, 0}, Publishes),
Publishes2 = lists:reverse(Publishes1),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we reverse the list after foldl, how about using foldr?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly from the docs

foldl/3 is tail recursive and would usually be preferred to foldr/3.

http://erlang.org/doc/man/lists.html#foldl-3

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aslo AFAIK list reverse is a BIF

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine to change this to foldr if you think it's required

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets keep foldl.

ok = gm:broadcast(GM, {batch_publish, ChPid, Flow, Publishes2},
MsgSizes),
BQS1 = BQ:batch_publish(Publishes2, ChPid, Flow, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
%% [0] When the slave process the publish instruction, it sets the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this meant to say "when the slave process handles the publish command"?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, process as a verb. Your explanation is clearer tho.

%% IsDelivered flag to true, so to avoid iterating over the messages
%% again at the slave, we do it here.

publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
ChPid, Flow, State = #state { gm = GM,
seen_status = SS,
Expand All @@ -253,6 +277,23 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
State1 = State #state { backing_queue_state = BQS1 },
{AckTag, ensure_monitoring(ChPid, State1)}.

batch_publish_delivered(Publishes, ChPid, Flow,
State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
{false, MsgSizes} =
lists:foldl(fun ({Msg = #basic_message { id = MsgId }, _MsgProps},
{false, Sizes}) ->
{false = dict:is_key(MsgId, SS), %% ASSERTION
Sizes + rabbit_basic:msg_size(Msg)}
end, {false, 0}, Publishes),
ok = gm:broadcast(GM, {batch_publish_delivered, ChPid, Flow, Publishes},
MsgSizes),
{AckTags, BQS1} = BQ:batch_publish_delivered(Publishes, ChPid, Flow, BQS),
State1 = State #state { backing_queue_state = BQS1 },
{AckTags, ensure_monitoring(ChPid, State1)}.

discard(MsgId, ChPid, Flow, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
Expand Down
71 changes: 57 additions & 14 deletions src/rabbit_mirror_queue_misc.erl
Expand Up @@ -22,7 +22,7 @@
initial_queue_node/2, suggested_queue_nodes/1,
is_mirrored/1, update_mirrors/2, validate_policy/1,
maybe_auto_sync/1, maybe_drop_master_after_sync/1,
log_info/3, log_warning/3]).
sync_batch_size/1, log_info/3, log_warning/3]).

%% for testing only
-export([module/1]).
Expand All @@ -38,11 +38,16 @@
[policy_validator, <<"ha-params">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
[policy_validator, <<"ha-sync-mode">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
[policy_validator, <<"ha-sync-batch-size">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
[policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}},
{requires, rabbit_registry},
{enables, recovery}]}).

%% For compatibility with versions that don't support sync batching.
-define(DEFAULT_BATCH_SIZE, 1).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we talking about pre-3.6.0 versions here? Mixed 3.6.0/3.5.x clusters are not allowed, so we can use a different default.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. I think this constant came to life on my first POCs, but is not required anymore. Probably not used in the code.

On Oct 10, 2015, at 3:28 PM, Michael Klishin notifications@github.com wrote:

In src/rabbit_mirror_queue_misc.erl:

         [policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}},
  {requires, rabbit_registry},
  {enables, recovery}]}).

+%% For compatibility with versions that don't support sync batching.
+-define(DEFAULT_BATCH_SIZE, 1).
Are we talking about pre-3.6.0 versions here? Mixed 3.6.0/3.5.x clusters are not allowed, so we can use a different default.


Reply to this email directly or view it on GitHub.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used via rabbit_mirror_queue_misc:sync_batch_size/0. I just don't think it serves any compatibility purpose and therefore has to be 1. I'd suggest making it 16K or so and moving to the app config.

Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the default to 16K leads to sync test failures.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the code I remembered the original purpose. The idea is for it to be 1, so you either use non-batch sync, or batched sync in case the policy has been set. The logic that assumes policy batch size either 1 or > 1 is here: https://github.com/rabbitmq/rabbitmq-server/blob/rabbitmq-server-336/src/rabbit_mirror_queue_sync.erl#L212

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, that makes more sense. I had changes that moved the default to the app file, bumped the default and simplified sync_batch_size/0, the error is

Running 5 of 72 tests; FILTER=eager_sync; COVER=false


eager_sync
----------
eager_sync:                       [setup] [running]rabbit_test_runner: make_test_multi...*failed*
in function sync_detection:wait_for_sync_status/5 (test/src/sync_detection.erl, line 159)
in call from eager_sync:sync/2 (test/src/eager_sync.erl, line 167)
in call from eager_sync:eager_sync/1 (test/src/eager_sync.erl, line 63)
in call from rabbit_test_runner:'-make_test_multi/7-fun-2-'/3 (src/rabbit_test_runner.erl, line 129)
**error:{sync_status_max_tries_failed,[{queue,<<"ha.two.test">>},
                               {node,c@urano},
                               {expected_status,true},
                               {max_tried,100.0}]}
  output:<<"">>

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see no reason to not batch all the time, only making batch size configurable (with 16K or so by default).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is finding the right batch size. 16k for big messages is too much. It can even cause a network partition (reason why we have max msg size 2Gb in the first place). Finding the right value depends on workload (this is explained on the related rabbitmq-website PR), but if we provide a default, I think it has to be lower.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Messages that are hundreds of MB in size are probably very rare. Most messages on common workloads are < 4K in size.

We can go with 4096 as default value and those with large messages can adjust it. 4K * 4 KiB per message = 16 MiB of payload, not particularly excessive.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carlhoerberg can you please help us pick the default batch size for eager (full) mirror sync? Maybe you have some stats on median/95th percentile message size distribution at cloudamqp, or any other data that can help us here?


%%----------------------------------------------------------------------------

-ifdef(use_specs).
Expand Down Expand Up @@ -332,6 +337,14 @@ module(Mode) when is_binary(Mode) ->
end
end.

validate_mode(Mode) ->
case module(Mode) of
{ok, _Module} ->
ok;
not_mirrored ->
{error, "~p is not a valid ha-mode value", [Mode]}
end.

is_mirrored(Q) ->
case module(Q) of
{ok, _} -> true;
Expand All @@ -355,6 +368,16 @@ maybe_auto_sync(Q = #amqqueue{pid = QPid}) ->
ok
end.

sync_batch_size(#amqqueue{} = Q) ->
case policy(<<"ha-sync-batch-size">>, Q) of
none -> %% we need this case because none > 1 == true
?DEFAULT_BATCH_SIZE;
BatchSize when BatchSize > 1 ->
BatchSize;
_ ->
?DEFAULT_BATCH_SIZE
end.

update_mirrors(OldQ = #amqqueue{pid = QPid},
NewQ = #amqqueue{pid = QPid}) ->
case {is_mirrored(OldQ), is_mirrored(NewQ)} of
Expand Down Expand Up @@ -410,25 +433,37 @@ validate_policy(KeyList) ->
Mode = proplists:get_value(<<"ha-mode">>, KeyList, none),
Params = proplists:get_value(<<"ha-params">>, KeyList, none),
SyncMode = proplists:get_value(<<"ha-sync-mode">>, KeyList, none),
SyncBatchSize = proplists:get_value(
<<"ha-sync-batch-size">>, KeyList, none),
PromoteOnShutdown = proplists:get_value(
<<"ha-promote-on-shutdown">>, KeyList, none),
case {Mode, Params, SyncMode, PromoteOnShutdown} of
{none, none, none, none} ->
case {Mode, Params, SyncMode, SyncBatchSize, PromoteOnShutdown} of
{none, none, none, none, none} ->
ok;
{none, _, _, _} ->
{none, _, _, _, _} ->
{error, "ha-mode must be specified to specify ha-params, "
"ha-sync-mode or ha-promote-on-shutdown", []};
_ ->
case module(Mode) of
{ok, M} -> case M:validate_policy(Params) of
ok -> case validate_sync_mode(SyncMode) of
ok -> validate_pos(PromoteOnShutdown);
E -> E
end;
E -> E
end;
_ -> {error, "~p is not a valid ha-mode value", [Mode]}
end
validate_policies(
[{Mode, fun validate_mode/1},
{Params, ha_params_validator(Mode)},
{SyncMode, fun validate_sync_mode/1},
{SyncBatchSize, fun validate_sync_batch_size/1},
{PromoteOnShutdown, fun validate_pos/1}])
end.

ha_params_validator(Mode) ->
fun(Val) ->
{ok, M} = module(Mode),
M:validate_policy(Val)
end.

validate_policies([]) ->
ok;
validate_policies([{Val, Validator} | Rest]) ->
case Validator(Val) of
ok -> validate_policies(Rest);
E -> E
end.

validate_sync_mode(SyncMode) ->
Expand All @@ -440,6 +475,14 @@ validate_sync_mode(SyncMode) ->
"or \"automatic\", got ~p", [Mode]}
end.

validate_sync_batch_size(none) ->
ok;
validate_sync_batch_size(N) when is_integer(N) andalso N > 0 ->
ok;
validate_sync_batch_size(N) ->
{error, "ha-sync-batch-size takes an integer greather than 0, "
"~p given", [N]}.

validate_pos(PromoteOnShutdown) ->
case PromoteOnShutdown of
<<"always">> -> ok;
Expand Down
27 changes: 27 additions & 0 deletions src/rabbit_mirror_queue_slave.erl
Expand Up @@ -851,6 +851,15 @@ process_instruction({publish, ChPid, Flow, MsgProps,
publish_or_discard(published, ChPid, MsgId, State),
BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, Flow, BQS),
{ok, State1 #state { backing_queue_state = BQS1 }};
process_instruction({batch_publish, ChPid, Flow, Publishes}, State) ->
maybe_flow_ack(ChPid, Flow),
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
lists:foldl(fun ({#basic_message { id = MsgId },
_MsgProps, _IsDelivered}, St) ->
publish_or_discard(published, ChPid, MsgId, St)
end, State, Publishes),
BQS1 = BQ:batch_publish(Publishes, ChPid, Flow, BQS),
{ok, State1 #state { backing_queue_state = BQS1 }};
process_instruction({publish_delivered, ChPid, Flow, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
maybe_flow_ack(ChPid, Flow),
Expand All @@ -860,6 +869,24 @@ process_instruction({publish_delivered, ChPid, Flow, MsgProps,
{AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS),
{ok, maybe_store_ack(true, MsgId, AckTag,
State1 #state { backing_queue_state = BQS1 })};
process_instruction({batch_publish_delivered, ChPid, Flow, Publishes}, State) ->
maybe_flow_ack(ChPid, Flow),
{MsgIds,
State1 = #state { backing_queue = BQ, backing_queue_state = BQS }} =
lists:foldl(fun ({#basic_message { id = MsgId }, _MsgProps},
{MsgIds, St}) ->
{[MsgId | MsgIds],
publish_or_discard(published, ChPid, MsgId, St)}
end, {[], State}, Publishes),
true = BQ:is_empty(BQS),
{AckTags, BQS1} = BQ:batch_publish_delivered(Publishes, ChPid, Flow, BQS),
MsgIdsAndAcks = lists:zip(lists:reverse(MsgIds), AckTags),
State2 = lists:foldl(
fun ({MsgId, AckTag}, St) ->
maybe_store_ack(true, MsgId, AckTag, St)
end, State1 #state { backing_queue_state = BQS1 },
MsgIdsAndAcks),
{ok, State2};
process_instruction({discard, ChPid, Flow, MsgId}, State) ->
maybe_flow_ack(ChPid, Flow),
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
Expand Down