Skip to content

Commit

Permalink
Merge pull request #344 from rabbitmq/rabbitmq-server-336
Browse files Browse the repository at this point in the history
Implements Mirror Queue Sync in Batches
  • Loading branch information
michaelklishin committed Oct 12, 2015
2 parents 63cc2bb + dc72935 commit 44a0ddb
Show file tree
Hide file tree
Showing 9 changed files with 474 additions and 117 deletions.
1 change: 1 addition & 0 deletions ebin/rabbit_app.in
Expand Up @@ -85,6 +85,7 @@
{ssl_apps, [asn1, crypto, public_key, ssl]},
%% see rabbitmq-server#114
{mirroring_flow_control, true},
{mirroring_sync_batch_size, 4096},
%% see rabbitmq-server#227 and related tickets.
%% msg_store_credit_disc_bound only takes effect when
%% messages are persisted to the message store. If messages
Expand Down
17 changes: 15 additions & 2 deletions src/rabbit_backing_queue.erl
Expand Up @@ -33,6 +33,10 @@

-type(flow() :: 'flow' | 'noflow').
-type(msg_ids() :: [rabbit_types:msg_id()]).
-type(publish() :: {rabbit_types:basic_message(),
rabbit_types:message_properties(), boolean()}).
-type(delivered_publish() :: {rabbit_types:basic_message(),
rabbit_types:message_properties()}).
-type(fetch_result(Ack) ::
('empty' | {rabbit_types:basic_message(), boolean(), Ack})).
-type(drop_result(Ack) ::
Expand Down Expand Up @@ -104,6 +108,9 @@
rabbit_types:message_properties(), boolean(), pid(), flow(),
state()) -> state().

%% Like publish/6 but for batches of publishes.
-callback batch_publish([publish()], pid(), flow(), state()) -> state().

%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
%% (i.e. saves the round trip through the backing queue).
Expand All @@ -112,6 +119,11 @@
state())
-> {ack(), state()}.

%% Like publish_delivered/5 but for batches of publishes.
-callback batch_publish_delivered([delivered_publish()], pid(), flow(),
state())
-> {[ack()], state()}.

%% Called to inform the BQ about messages which have reached the
%% queue, but are not going to be further passed to BQ.
-callback discard(rabbit_types:msg_id(), pid(), flow(), state()) -> state().
Expand Down Expand Up @@ -253,8 +265,9 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
{delete_and_terminate, 2}, {delete_crashed, 1}, {purge, 1},
{purge_acks, 1}, {publish, 6},
{publish_delivered, 5}, {discard, 4}, {drain_confirmed, 1},
{purge_acks, 1}, {publish, 6}, {publish_delivered, 5},
{batch_publish, 4}, {batch_publish_delivered, 4},
{discard, 4}, {drain_confirmed, 1},
{dropwhile, 2}, {fetchwhile, 4}, {fetch, 2},
{drop, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
Expand Down
45 changes: 43 additions & 2 deletions src/rabbit_mirror_queue_master.erl
Expand Up @@ -18,6 +18,7 @@

-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, purge_acks/1, publish/6, publish_delivered/5,
batch_publish/4, batch_publish_delivered/4,
discard/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
Expand Down Expand Up @@ -147,13 +148,15 @@ sync_mirrors(HandleInfo, EmitStats,
QName, "Synchronising: " ++ Fmt ++ "~n", Params)
end,
Log("~p messages to synchronise", [BQ:len(BQS)]),
{ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
{ok, #amqqueue{slave_pids = SPids} = Q} = rabbit_amqqueue:lookup(QName),
SyncBatchSize = rabbit_mirror_queue_misc:sync_batch_size(Q),
Log("batch size: ~p", [SyncBatchSize]),
Ref = make_ref(),
Syncer = rabbit_mirror_queue_sync:master_prepare(Ref, QName, Log, SPids),
gm:broadcast(GM, {sync_start, Ref, Syncer, SPids}),
S = fun(BQSN) -> State#state{backing_queue_state = BQSN} end,
case rabbit_mirror_queue_sync:master_go(
Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) of
Syncer, Ref, Log, HandleInfo, EmitStats, SyncBatchSize, BQ, BQS) of
{shutdown, R, BQS1} -> {stop, R, S(BQS1)};
{sync_died, R, BQS1} -> Log("~p", [R]),
{ok, S(BQS1)};
Expand Down Expand Up @@ -241,6 +244,27 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, Flow,
BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).

batch_publish(Publishes, ChPid, Flow,
State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
{Publishes1, false, MsgSizes} =
lists:foldl(fun ({Msg = #basic_message { id = MsgId },
MsgProps, _IsDelivered}, {Pubs, false, Sizes}) ->
{[{Msg, MsgProps, true} | Pubs], %% [0]
false = dict:is_key(MsgId, SS), %% ASSERTION
Sizes + rabbit_basic:msg_size(Msg)}
end, {[], false, 0}, Publishes),
Publishes2 = lists:reverse(Publishes1),
ok = gm:broadcast(GM, {batch_publish, ChPid, Flow, Publishes2},
MsgSizes),
BQS1 = BQ:batch_publish(Publishes2, ChPid, Flow, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
%% [0] When the slave process handles the publish command, it sets the
%% IsDelivered flag to true, so to avoid iterating over the messages
%% again at the slave, we do it here.

publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
ChPid, Flow, State = #state { gm = GM,
seen_status = SS,
Expand All @@ -253,6 +277,23 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
State1 = State #state { backing_queue_state = BQS1 },
{AckTag, ensure_monitoring(ChPid, State1)}.

batch_publish_delivered(Publishes, ChPid, Flow,
State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
{false, MsgSizes} =
lists:foldl(fun ({Msg = #basic_message { id = MsgId }, _MsgProps},
{false, Sizes}) ->
{false = dict:is_key(MsgId, SS), %% ASSERTION
Sizes + rabbit_basic:msg_size(Msg)}
end, {false, 0}, Publishes),
ok = gm:broadcast(GM, {batch_publish_delivered, ChPid, Flow, Publishes},
MsgSizes),
{AckTags, BQS1} = BQ:batch_publish_delivered(Publishes, ChPid, Flow, BQS),
State1 = State #state { backing_queue_state = BQS1 },
{AckTags, ensure_monitoring(ChPid, State1)}.

discard(MsgId, ChPid, Flow, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
Expand Down
75 changes: 61 additions & 14 deletions src/rabbit_mirror_queue_misc.erl
Expand Up @@ -22,7 +22,7 @@
initial_queue_node/2, suggested_queue_nodes/1,
is_mirrored/1, update_mirrors/2, validate_policy/1,
maybe_auto_sync/1, maybe_drop_master_after_sync/1,
log_info/3, log_warning/3]).
sync_batch_size/1, log_info/3, log_warning/3]).

%% for testing only
-export([module/1]).
Expand All @@ -38,11 +38,14 @@
[policy_validator, <<"ha-params">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
[policy_validator, <<"ha-sync-mode">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
[policy_validator, <<"ha-sync-batch-size">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
[policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}},
{requires, rabbit_registry},
{enables, recovery}]}).


%%----------------------------------------------------------------------------

-ifdef(use_specs).
Expand Down Expand Up @@ -332,6 +335,14 @@ module(Mode) when is_binary(Mode) ->
end
end.

validate_mode(Mode) ->
case module(Mode) of
{ok, _Module} ->
ok;
not_mirrored ->
{error, "~p is not a valid ha-mode value", [Mode]}
end.

is_mirrored(Q) ->
case module(Q) of
{ok, _} -> true;
Expand All @@ -355,6 +366,22 @@ maybe_auto_sync(Q = #amqqueue{pid = QPid}) ->
ok
end.

sync_batch_size(#amqqueue{} = Q) ->
case policy(<<"ha-sync-batch-size">>, Q) of
none -> %% we need this case because none > 1 == true
default_batch_size();
BatchSize when BatchSize > 1 ->
BatchSize;
_ ->
default_batch_size()
end.

-define(DEFAULT_BATCH_SIZE, 4096).

default_batch_size() ->
rabbit_misc:get_env(rabbit, mirroring_sync_batch_size,
?DEFAULT_BATCH_SIZE).

update_mirrors(OldQ = #amqqueue{pid = QPid},
NewQ = #amqqueue{pid = QPid}) ->
case {is_mirrored(OldQ), is_mirrored(NewQ)} of
Expand Down Expand Up @@ -410,25 +437,37 @@ validate_policy(KeyList) ->
Mode = proplists:get_value(<<"ha-mode">>, KeyList, none),
Params = proplists:get_value(<<"ha-params">>, KeyList, none),
SyncMode = proplists:get_value(<<"ha-sync-mode">>, KeyList, none),
SyncBatchSize = proplists:get_value(
<<"ha-sync-batch-size">>, KeyList, none),
PromoteOnShutdown = proplists:get_value(
<<"ha-promote-on-shutdown">>, KeyList, none),
case {Mode, Params, SyncMode, PromoteOnShutdown} of
{none, none, none, none} ->
case {Mode, Params, SyncMode, SyncBatchSize, PromoteOnShutdown} of
{none, none, none, none, none} ->
ok;
{none, _, _, _} ->
{none, _, _, _, _} ->
{error, "ha-mode must be specified to specify ha-params, "
"ha-sync-mode or ha-promote-on-shutdown", []};
_ ->
case module(Mode) of
{ok, M} -> case M:validate_policy(Params) of
ok -> case validate_sync_mode(SyncMode) of
ok -> validate_pos(PromoteOnShutdown);
E -> E
end;
E -> E
end;
_ -> {error, "~p is not a valid ha-mode value", [Mode]}
end
validate_policies(
[{Mode, fun validate_mode/1},
{Params, ha_params_validator(Mode)},
{SyncMode, fun validate_sync_mode/1},
{SyncBatchSize, fun validate_sync_batch_size/1},
{PromoteOnShutdown, fun validate_pos/1}])
end.

ha_params_validator(Mode) ->
fun(Val) ->
{ok, M} = module(Mode),
M:validate_policy(Val)
end.

validate_policies([]) ->
ok;
validate_policies([{Val, Validator} | Rest]) ->
case Validator(Val) of
ok -> validate_policies(Rest);
E -> E
end.

validate_sync_mode(SyncMode) ->
Expand All @@ -440,6 +479,14 @@ validate_sync_mode(SyncMode) ->
"or \"automatic\", got ~p", [Mode]}
end.

validate_sync_batch_size(none) ->
ok;
validate_sync_batch_size(N) when is_integer(N) andalso N > 0 ->
ok;
validate_sync_batch_size(N) ->
{error, "ha-sync-batch-size takes an integer greather than 0, "
"~p given", [N]}.

validate_pos(PromoteOnShutdown) ->
case PromoteOnShutdown of
<<"always">> -> ok;
Expand Down
27 changes: 27 additions & 0 deletions src/rabbit_mirror_queue_slave.erl
Expand Up @@ -851,6 +851,15 @@ process_instruction({publish, ChPid, Flow, MsgProps,
publish_or_discard(published, ChPid, MsgId, State),
BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, Flow, BQS),
{ok, State1 #state { backing_queue_state = BQS1 }};
process_instruction({batch_publish, ChPid, Flow, Publishes}, State) ->
maybe_flow_ack(ChPid, Flow),
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
lists:foldl(fun ({#basic_message { id = MsgId },
_MsgProps, _IsDelivered}, St) ->
publish_or_discard(published, ChPid, MsgId, St)
end, State, Publishes),
BQS1 = BQ:batch_publish(Publishes, ChPid, Flow, BQS),
{ok, State1 #state { backing_queue_state = BQS1 }};
process_instruction({publish_delivered, ChPid, Flow, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
maybe_flow_ack(ChPid, Flow),
Expand All @@ -860,6 +869,24 @@ process_instruction({publish_delivered, ChPid, Flow, MsgProps,
{AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS),
{ok, maybe_store_ack(true, MsgId, AckTag,
State1 #state { backing_queue_state = BQS1 })};
process_instruction({batch_publish_delivered, ChPid, Flow, Publishes}, State) ->
maybe_flow_ack(ChPid, Flow),
{MsgIds,
State1 = #state { backing_queue = BQ, backing_queue_state = BQS }} =
lists:foldl(fun ({#basic_message { id = MsgId }, _MsgProps},
{MsgIds, St}) ->
{[MsgId | MsgIds],
publish_or_discard(published, ChPid, MsgId, St)}
end, {[], State}, Publishes),
true = BQ:is_empty(BQS),
{AckTags, BQS1} = BQ:batch_publish_delivered(Publishes, ChPid, Flow, BQS),
MsgIdsAndAcks = lists:zip(lists:reverse(MsgIds), AckTags),
State2 = lists:foldl(
fun ({MsgId, AckTag}, St) ->
maybe_store_ack(true, MsgId, AckTag, St)
end, State1 #state { backing_queue_state = BQS1 },
MsgIdsAndAcks),
{ok, State2};
process_instruction({discard, ChPid, Flow, MsgId}, State) ->
maybe_flow_ack(ChPid, Flow),
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
Expand Down

0 comments on commit 44a0ddb

Please sign in to comment.