Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flush messages to disk in batches. #1388

Merged
merged 3 commits into from Oct 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/rabbit_amqqueue_process.erl
Expand Up @@ -985,9 +985,9 @@ prioritise_cast(Msg, _Len, _State) ->
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
{ack, _AckTags, _ChPid} -> 3; %% [1]
{resume, _ChPid} -> 2;
{notify_sent, _ChPid, _Credit} -> 1;
{ack, _AckTags, _ChPid} -> 4; %% [1]
{resume, _ChPid} -> 3;
{notify_sent, _ChPid, _Credit} -> 2;
_ -> 0
end.

Expand All @@ -999,6 +999,9 @@ prioritise_cast(Msg, _Len, _State) ->
%% stack are optimised for that) and to make things easier to reason
%% about. Finally, we prioritise ack over resume since it should
%% always reduce memory use.
%% bump_reduce_memory_use is prioritised over publishes, because sending
%% credit to self is hard to reason about. Consumers can continue while
%% reduce_memory_use is in progress.

prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
case Msg of
Expand All @@ -1008,6 +1011,7 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
{drop_expired, _Version} -> 8;
emit_stats -> 7;
sync_timeout -> 6;
bump_reduce_memory_use -> 1;
_ -> 0
end.

Expand Down Expand Up @@ -1382,6 +1386,10 @@ handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ,
%% rabbit_variable_queue:msg_store_write/4.
credit_flow:handle_bump_msg(Msg),
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
handle_info(bump_reduce_memory_use, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
put(waiting_bump, false),
noreply(State#q{backing_queue_state = BQ:resume(BQS)});

handle_info(Info, State) ->
{stop, {unhandled_info, Info}, State}.
Expand Down
3 changes: 3 additions & 0 deletions src/rabbit_mirror_queue_slave.erl
Expand Up @@ -348,6 +348,9 @@ handle_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
noreply(State);

handle_info(bump_reduce_memory_use, State) ->
noreply(State);

%% In the event of a short partition during sync we can detect the
%% master's 'death', drop out of sync, and then receive sync messages
%% which were still in flight. Ignore them.
Expand Down
74 changes: 54 additions & 20 deletions src/rabbit_variable_queue.erl
Expand Up @@ -2363,45 +2363,79 @@ reduce_memory_use(State = #vqstate {
out = AvgEgress,
ack_in = AvgAckIngress,
ack_out = AvgAckEgress } }) ->
State1 = #vqstate { q2 = Q2, q3 = Q3 } =
{CreditDiscBound, _} =rabbit_misc:get_env(rabbit,
msg_store_credit_disc_bound,
?CREDIT_DISC_BOUND),
{NeedResumeA2B, State1} = {_, #vqstate { q2 = Q2, q3 = Q3 }} =
case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
0 -> State;
0 -> {false, State};
%% Reduce memory of pending acks and alphas. The order is
%% determined based on which is growing faster. Whichever
%% comes second may very well get a quota of 0 if the
%% first manages to push out the max number of messages.
S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) >
A2BChunk ->
%% In case there are few messages to be sent to a message store
%% and many messages to be embedded to the queue index,
%% we should limit the number of messages to be flushed
%% to avoid blocking the process.
A2BChunkActual = case A2BChunk > CreditDiscBound * 2 of
true -> CreditDiscBound * 2;
false -> A2BChunk
end,
Funs = case ((AvgAckIngress - AvgAckEgress) >
(AvgIngress - AvgEgress)) of
true -> [fun limit_ram_acks/2,
fun push_alphas_to_betas/2];
false -> [fun push_alphas_to_betas/2,
fun limit_ram_acks/2]
end,
{_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
{Quota, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
ReduceFun(QuotaN, StateN)
end, {S1, State}, Funs),
State2
end, {A2BChunkActual, State}, Funs),
{(Quota == 0) andalso (A2BChunk > A2BChunkActual), State2}
end,

State3 =
Permitted = permitted_beta_count(State1),
{NeedResumeB2D, State3} =
%% If there are more messages with their queue position held in RAM,
%% a.k.a. betas, in Q2 & Q3 than IoBatchSize,
%% write their queue position to disk, a.k.a. push_betas_to_deltas
case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
permitted_beta_count(State1)) of
S2 when S2 >= IoBatchSize ->
%% There is an implicit, but subtle, upper bound here. We
%% may shuffle a lot of messages from Q2/3 into delta, but
%% the number of these that require any disk operation,
%% namely index writing, i.e. messages that are genuine
%% betas and not gammas, is bounded by the credit_flow
%% limiting of the alpha->beta conversion above.
push_betas_to_deltas(S2, State1);
Permitted) of
B2DChunk when B2DChunk >= IoBatchSize ->
%% Same as for alphas to betas. Limit a number of messages
%% to be flushed to disk at once to avoid blocking the process.
B2DChunkActual = case B2DChunk > CreditDiscBound * 2 of
true -> CreditDiscBound * 2;
false -> B2DChunk
end,
StateBD = push_betas_to_deltas(B2DChunkActual, State1),
{B2DChunk > B2DChunkActual, StateBD};
_ ->
State1
{false, State1}
end,
%% See rabbitmq-server-290 for the reasons behind this GC call.
garbage_collect(),
%% We can be blocked by the credit flow, or limited by a batch size,
%% or finished with flushing.
%% If blocked by the credit flow - the credit grant will resume processing,
%% if limited by a batch - the batch continuation message should be sent.
%% The continuation message will be prioritised over publishes,
%% but not cinsumptions, so the queue can make progess.
Blocked = credit_flow:blocked(),
case {Blocked, NeedResumeA2B orelse NeedResumeB2D} of
%% Credit bump will continue paging
{true, _} -> ok;
%% Finished with paging
{false, false} -> ok;
%% Planning next batch
{false, true} ->
%% We don't want to use self-credit-flow, because it's harder to
%% reason about. So the process sends a (prioritised) message to
%% itself and sets a waiting_bump value to keep the message box clean
case get(waiting_bump) of
true -> ok;
_ -> self() ! bump_reduce_memory_use,
put(waiting_bump, true)
end
end,
State3;
%% When using lazy queues, there are no alphas, so we don't need to
%% call push_alphas_to_betas/2.
Expand Down