Skip to content

Commit

Permalink
Use rabbit_queue_behaviour and callback
Browse files Browse the repository at this point in the history
Alternative for #1388 that does not use process dictionary.

Requires rabbitmq/rabbitmq-common#228
  • Loading branch information
lukebakken committed Oct 12, 2017
1 parent 79fb8a1 commit 2821efd
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 18 deletions.
8 changes: 4 additions & 4 deletions src/rabbit_amqqueue_process.erl
Expand Up @@ -1386,10 +1386,10 @@ handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ,
%% rabbit_variable_queue:msg_store_write/4.
credit_flow:handle_bump_msg(Msg),
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
handle_info(bump_reduce_memory_use, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
put(waiting_bump, false),
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
handle_info(bump_reduce_memory_use, State = #q{ backing_queue = BQ,
backing_queue_state = BQS0 }) ->
BQS1 = BQ:handled_bump_reduce_memory_use(BQS0),
noreply(State#q{ backing_queue_state = BQ:resume(BQS1) });

handle_info(Info, State) ->
{stop, {unhandled_info, Info}, State}.
Expand Down
7 changes: 6 additions & 1 deletion src/rabbit_mirror_queue_master.erl
Expand Up @@ -24,7 +24,7 @@
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1,
msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
zip_msgs_and_acks/4]).
zip_msgs_and_acks/4, handled_bump_reduce_memory_use/1]).

-export([start/1, stop/0, delete_crashed/1]).

Expand Down Expand Up @@ -513,6 +513,11 @@ zip_msgs_and_acks(Msgs, AckTags, Accumulator,
backing_queue_state = BQS }) ->
BQ:zip_msgs_and_acks(Msgs, AckTags, Accumulator, BQS).

handled_bump_reduce_memory_use(State = #state{ backing_queue = BQ,
backing_queue_state = BQS0 }) ->
BQS1 = BQ:handled_bump_reduce_memory_use(BQS0),
State#state{ backing_queue_state = BQS1 }.

%% ---------------------------------------------------------------------------
%% Other exported functions
%% ---------------------------------------------------------------------------
Expand Down
5 changes: 4 additions & 1 deletion src/rabbit_priority_queue.erl
Expand Up @@ -41,7 +41,7 @@
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
handle_pre_hibernate/1, resume/1, msg_rates/1,
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
zip_msgs_and_acks/4]).
zip_msgs_and_acks/4, handled_bump_reduce_memory_use/1]).

-record(state, {bq, bqss, max_priority}).
-record(passthrough, {bq, bqs}).
Expand Down Expand Up @@ -447,6 +447,9 @@ zip_msgs_and_acks(Msgs, AckTags, Accumulator,
#passthrough{bq = BQ, bqs = BQS}) ->
BQ:zip_msgs_and_acks(Msgs, AckTags, Accumulator, BQS).

handled_bump_reduce_memory_use(State = #passthrough{ bq = BQ, bqs = BQS }) ->
?passthrough1(handled_bump_reduce_memory_use(BQS)).

%%----------------------------------------------------------------------------

bq() ->
Expand Down
28 changes: 17 additions & 11 deletions src/rabbit_variable_queue.erl
Expand Up @@ -26,7 +26,8 @@
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
handle_pre_hibernate/1, resume/1, msg_rates/1,
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
zip_msgs_and_acks/4, multiple_routing_keys/0]).
zip_msgs_and_acks/4, multiple_routing_keys/0,
handled_bump_reduce_memory_use/1]).

-export([start/1, stop/0]).

Expand Down Expand Up @@ -311,7 +312,8 @@
%% number of reduce_memory_usage executions, once it
%% reaches a threshold the queue will manually trigger a runtime GC
%% see: maybe_execute_gc/1
memory_reduction_run_count
memory_reduction_run_count,
waiting_bump
}).

-record(rates, { in, out, ack_in, ack_out, timestamp }).
Expand Down Expand Up @@ -2422,21 +2424,16 @@ reduce_memory_use(State = #vqstate {
Blocked = credit_flow:blocked(),
case {Blocked, NeedResumeA2B orelse NeedResumeB2D} of
%% Credit bump will continue paging
{true, _} -> ok;
{true, _} -> State3;
%% Finished with paging
{false, false} -> ok;
{false, false} -> State3;
%% Planning next batch
{false, true} ->
%% We don't want to use self-credit-flow, because it's harder to
%% reason about. So the process sends a (prioritised) message to
%% itself and sets a waiting_bump value to keep the message box clean
case get(waiting_bump) of
true -> ok;
_ -> self() ! bump_reduce_memory_use,
put(waiting_bump, waiting)
end
end,
State3;
maybe_bump_reduce_memory_use(State3)
end;
%% When using lazy queues, there are no alphas, so we don't need to
%% call push_alphas_to_betas/2.
reduce_memory_use(State = #vqstate {
Expand All @@ -2462,6 +2459,15 @@ reduce_memory_use(State = #vqstate {
garbage_collect(),
State3.

maybe_bump_reduce_memory_use(State = #vqstate{ waiting_bump = true }) ->
State;
maybe_bump_reduce_memory_use(State) ->
self() ! bump_reduce_memory_use,
State#vqstate{ waiting_bump = waiting }.

handled_bump_reduce_memory_use(State = #vqstate{ waiting_bump = waiting }) ->
State#vqstate{ waiting_bump = true }.

limit_ram_acks(0, State) ->
{0, ui(State)};
limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
Expand Down
6 changes: 5 additions & 1 deletion test/channel_operation_timeout_test_queue.erl
Expand Up @@ -26,7 +26,8 @@
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
handle_pre_hibernate/1, resume/1, msg_rates/1,
info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
zip_msgs_and_acks/4, multiple_routing_keys/0]).
zip_msgs_and_acks/4, multiple_routing_keys/0,
handled_bump_reduce_memory_use/1]).

-export([start/1, stop/0]).

Expand Down Expand Up @@ -710,6 +711,9 @@ zip_msgs_and_acks(Msgs, AckTags, Accumulator, _State) ->
[{Id, AckTag} | Acc]
end, Accumulator, lists:zip(Msgs, AckTags)).

handled_bump_reduce_memory_use(State) ->
State.

convert_to_lazy(State) ->
State1 = #vqstate { delta = Delta, q3 = Q3, len = Len } =
set_ram_duration_target(0, State),
Expand Down

0 comments on commit 2821efd

Please sign in to comment.