Skip to content


Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
branch: master
Fetching contributors…

Cannot retrieve contributors at this time

2085 lines (1889 sloc) 94.518 kb
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%% The Original Code is RabbitMQ.
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
purge/1, purge_acks/1,
publish/6, publish_delivered/5, discard/4, drain_confirmed/1,
dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2,
ackfold/4, fold/3, len/1, is_empty/1, depth/1,
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
handle_pre_hibernate/1, resume/1, msg_rates/1,
info/2, invoke/3, is_duplicate/2, multiple_routing_keys/0]).
-export([start/1, stop/0]).
%% exported for testing only
-export([start_msg_store/2, stop_msg_store/0, init/6]).
%% Messages, and their position in the queue, can be in memory or on
%% disk, or both. Persistent messages will have both message and
%% position pushed to disk as soon as they arrive; transient messages
%% can be written to disk (and thus both types can be evicted from
%% memory) under memory pressure. The question of whether a message is
%% in RAM and whether it is persistent are orthogonal.
%% Messages are persisted using the queue index and the message
%% store. Normally the queue index holds the position of the message
%% *within this queue* along with a couple of small bits of metadata,
%% while the message store holds the message itself (including headers
%% and other properties).
%% However, as an optimisation, small messages can be embedded
%% directly in the queue index and bypass the message store
%% altogether.
%% Definitions:
%% alpha: this is a message where both the message itself, and its
%% position within the queue are held in RAM
%% beta: this is a message where the message itself is only held on
%% disk (if persisted to the message store) but its position
%% within the queue is held in RAM.
%% gamma: this is a message where the message itself is only held on
%% disk, but its position is both in RAM and on disk.
%% delta: this is a collection of messages, represented by a single
%% term, where the messages and their position are only held on
%% disk.
%% Note that for persistent messages, the message and its position
%% within the queue are always held on disk, *in addition* to being in
%% one of the above classifications.
%% Also note that within this code, the term gamma seldom
%% appears. It's frequently the case that gammas are defined by betas
%% who have had their queue position recorded on disk.
%% In general, messages move q1 -> q2 -> delta -> q3 -> q4, though
%% many of these steps are frequently skipped. q1 and q4 only hold
%% alphas, q2 and q3 hold both betas and gammas. When a message
%% arrives, its classification is determined. It is then added to the
%% rightmost appropriate queue.
%% If a new message is determined to be a beta or gamma, q1 is
%% empty. If a new message is determined to be a delta, q1 and q2 are
%% empty (and actually q4 too).
%% When removing messages from a queue, if q4 is empty then q3 is read
%% directly. If q3 becomes empty then the next segment's worth of
%% messages from delta are read into q3, reducing the size of
%% delta. If the queue is non empty, either q4 or q3 contain
%% entries. It is never permitted for delta to hold all the messages
%% in the queue.
%% The duration indicated to us by the memory_monitor is used to
%% calculate, given our current ingress and egress rates, how many
%% messages we should hold in RAM (i.e. as alphas). We track the
%% ingress and egress rates for both messages and pending acks and
%% rates for both are considered when calculating the number of
%% messages to hold in RAM. When we need to push alphas to betas or
%% betas to gammas, we favour writing out messages that are further
%% from the head of the queue. This minimises writes to disk, as the
%% messages closer to the tail of the queue stay in the queue for
%% longer, thus do not need to be replaced as quickly by sending other
%% messages to disk.
%% Whilst messages are pushed to disk and forgotten from RAM as soon
%% as requested by a new setting of the queue RAM duration, the
%% inverse is not true: we only load messages back into RAM as
%% demanded as the queue is read from. Thus only publishes to the
%% queue will take up available spare capacity.
%% When we report our duration to the memory monitor, we calculate
%% average ingress and egress rates over the last two samples, and
%% then calculate our duration based on the sum of the ingress and
%% egress rates. More than two samples could be used, but it's a
%% balance between responding quickly enough to changes in
%% producers/consumers versus ignoring temporary blips. The problem
%% with temporary blips is that with just a few queues, they can have
%% substantial impact on the calculation of the average duration and
%% hence cause unnecessary I/O. Another alternative is to increase the
%% amqqueue_process:RAM_DURATION_UPDATE_PERIOD to beyond 5
%% seconds. However, that then runs the risk of being too slow to
%% inform the memory monitor of changes. Thus a 5 second interval,
%% plus a rolling average over the last two samples seems to work
%% well in practice.
%% The sum of the ingress and egress rates is used because the egress
%% rate alone is not sufficient. Adding in the ingress rate means that
%% queues which are being flooded by messages are given more memory,
%% resulting in them being able to process the messages faster (by
%% doing less I/O, or at least deferring it) and thus helping keep
%% their mailboxes empty and thus the queue as a whole is more
%% responsive. If such a queue also has fast but previously idle
%% consumers, the consumer can then start to be driven as fast as it
%% can go, whereas if only egress rate was being used, the incoming
%% messages may have to be written to disk and then read back in,
%% resulting in the hard disk being a bottleneck in driving the
%% consumers. Generally, we want to give Rabbit every chance of
%% getting rid of messages as fast as possible and remaining
%% responsive, and using only the egress rate impacts that goal.
%% Once the queue has more alphas than the target_ram_count, the
%% surplus must be converted to betas, if not gammas, if not rolled
%% into delta. The conditions under which these transitions occur
%% reflect the conflicting goals of minimising RAM cost per msg, and
%% minimising CPU cost per msg. Once the msg has become a beta, its
%% payload is no longer in RAM, thus a read from the msg_store must
%% occur before the msg can be delivered, but the RAM cost of a beta
%% is the same as a gamma, so converting a beta to gamma will not free
%% up any further RAM. To reduce the RAM cost further, the gamma must
%% be rolled into delta. Whilst recovering a beta or a gamma to an
%% alpha requires only one disk read (from the msg_store), recovering
%% a msg from within delta will require two reads (queue_index and
%% then msg_store). But delta has a near-0 per-msg RAM cost. So the
%% conflict is between using delta more, which will free up more
%% memory, but require additional CPU and disk ops, versus using delta
%% less and gammas and betas more, which will cost more memory, but
%% require fewer disk ops and less CPU overhead.
%% In the case of a persistent msg published to a durable queue, the
%% msg is immediately written to the msg_store and queue_index. If
%% then additionally converted from an alpha, it'll immediately go to
%% a gamma (as it's already in queue_index), and cannot exist as a
%% beta. Thus a durable queue with a mixture of persistent and
%% transient msgs in it which has more messages than permitted by the
%% target_ram_count may contain an interspersed mixture of betas and
%% gammas in q2 and q3.
%% There is then a ratio that controls how many betas and gammas there
%% can be. This is based on the target_ram_count and thus expresses
%% the fact that as the number of permitted alphas in the queue falls,
%% so should the number of betas and gammas fall (i.e. delta
%% grows). If q2 and q3 contain more than the permitted number of
%% betas and gammas, then the surplus are forcibly converted to gammas
%% (as necessary) and then rolled into delta. The ratio is that
%% delta/(betas+gammas+delta) equals
%% (betas+gammas+delta)/(target_ram_count+betas+gammas+delta). I.e. as
%% the target_ram_count shrinks to 0, so must betas and gammas.
%% The conversion of betas to gammas is done in batches of at least
%% ?IO_BATCH_SIZE. This value should not be too small, otherwise the
%% frequent operations on the queues of q2 and q3 will not be
%% effectively amortised (switching the direction of queue access
%% defeats amortisation). Note that there is a natural upper bound due
%% to credit_flow limits on the alpha to beta conversion.
%% The conversion from alphas to betas is chunked due to the
%% credit_flow limits of the msg_store. This further smooths the
%% effects of changes to the target_ram_count and ensures the queue
%% remains responsive even when there is a large amount of IO work to
%% do. The 'resume' callback is utilised to ensure that conversions
%% are done as promptly as possible whilst ensuring the queue remains
%% responsive.
%% In the queue we keep track of both messages that are pending
%% delivery and messages that are pending acks. In the event of a
%% queue purge, we only need to load qi segments if the queue has
%% elements in deltas (i.e. it came under significant memory
%% pressure). In the event of a queue deletion, in addition to the
%% preceding, by keeping track of pending acks in RAM, we do not need
%% to search through qi segments looking for messages that are yet to
%% be acknowledged.
%% Pending acks are recorded in memory by storing the message itself.
%% If the message has been sent to disk, we do not store the message
%% content. During memory reduction, pending acks containing message
%% content have that content removed and the corresponding messages
%% are pushed out to disk.
%% Messages from pending acks are returned to q4, q3 and delta during
%% requeue, based on the limits of seq_id contained in each. Requeued
%% messages retain their original seq_id, maintaining order
%% when requeued.
%% The order in which alphas are pushed to betas and pending acks
%% are pushed to disk is determined dynamically. We always prefer to
%% push messages for the source (alphas or acks) that is growing the
%% fastest (with growth measured as avg. ingress - avg. egress).
%% Notes on Clean Shutdown
%% (This documents behaviour in variable_queue, queue_index and
%% msg_store.)
%% In order to try to achieve as fast a start-up as possible, if a
%% clean shutdown occurs, we try to save out state to disk to reduce
%% work on startup. In the msg_store this takes the form of the
%% index_module's state, plus the file_summary ets table, and client
%% refs. In the VQ, this takes the form of the count of persistent
%% messages in the queue and references into the msg_stores. The
%% queue_index adds to these terms the details of its segments and
%% stores the terms in the queue directory.
%% Two message stores are used. One is created for persistent messages
%% to durable queues that must survive restarts, and the other is used
%% for all other messages that just happen to need to be written to
%% disk. On start up we can therefore nuke the transient message
%% store, and be sure that the messages in the persistent store are
%% all that we need.
%% The references to the msg_stores are there so that the msg_store
%% knows to only trust its saved state if all of the queues it was
%% previously talking to come up cleanly. Likewise, the queues
%% themselves (esp queue_index) skips work in init if all the queues
%% and msg_store were shutdown cleanly. This gives both good speed
%% improvements and also robustness so that if anything possibly went
%% wrong in shutdown (or there was subsequent manual tampering), all
%% messages and queues that can be recovered are recovered, safely.
%% To delete transient messages lazily, the variable_queue, on
%% startup, stores the next_seq_id reported by the queue_index as the
%% transient_threshold. From that point on, whenever it's reading a
%% message off disk via the queue_index, if the seq_id is below this
%% threshold and the message is transient then it drops the message
%% (the message itself won't exist on disk because it would have been
%% stored in the transient msg_store which would have had its saved
%% state nuked on startup). This avoids the expensive operation of
%% scanning the entire queue on startup in order to delete transient
%% messages that were only pushed to disk to save memory.
{ q1,
ram_pending_ack, %% msgs using store, still in RAM
disk_pending_ack, %% msgs in store, paged out
qi_pending_ack, %% msgs using qi, *can't* be paged out
len, %% w/o unacked
bytes, %% w/o unacked
persistent_count, %% w unacked
persistent_bytes, %% w unacked
ram_msg_count, %% w/o unacked
ram_bytes, %% w unacked
%% Unlike the other counters these two do not feed into
%% #rates{} and get reset
-record(rates, { in, out, ack_in, ack_out, timestamp }).
{ seq_id,
{ start_seq_id, %% start_seq_id is inclusive
end_seq_id %% end_seq_id is exclusive
%% When we discover that we should write some indices to disk for some
%% betas, the IO_BATCH_SIZE sets the number of betas that we must be
%% due to write indices for before we do any work at all.
-define(IO_BATCH_SIZE, 2048). %% next power-of-2 after ?CREDIT_DISC_BOUND
-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
-define(QUEUE, lqueue).
-rabbit_upgrade({multiple_routing_keys, local, []}).
-type(seq_id() :: non_neg_integer()).
-type(rates() :: #rates { in :: float(),
out :: float(),
ack_in :: float(),
ack_out :: float(),
timestamp :: rabbit_types:timestamp()}).
-type(delta() :: #delta { start_seq_id :: non_neg_integer(),
count :: non_neg_integer(),
end_seq_id :: non_neg_integer() }).
%% The compiler (rightfully) complains that ack() and state() are
%% unused. For this reason we duplicate a -spec from
%% rabbit_backing_queue with the only intent being to remove
%% warnings. The problem here is that we can't parameterise the BQ
%% behaviour by these two types as we would like to. We still leave
%% these here for documentation purposes.
-type(ack() :: seq_id()).
-type(state() :: #vqstate {
q1 :: ?QUEUE:?QUEUE(),
q2 :: ?QUEUE:?QUEUE(),
delta :: delta(),
q3 :: ?QUEUE:?QUEUE(),
q4 :: ?QUEUE:?QUEUE(),
next_seq_id :: seq_id(),
ram_pending_ack :: gb_trees:tree(),
disk_pending_ack :: gb_trees:tree(),
qi_pending_ack :: gb_trees:tree(),
index_state :: any(),
msg_store_clients :: 'undefined' | {{any(), binary()},
{any(), binary()}},
durable :: boolean(),
transient_threshold :: non_neg_integer(),
len :: non_neg_integer(),
bytes :: non_neg_integer(),
unacked_bytes :: non_neg_integer(),
persistent_count :: non_neg_integer(),
persistent_bytes :: non_neg_integer(),
target_ram_count :: non_neg_integer() | 'infinity',
ram_msg_count :: non_neg_integer(),
ram_msg_count_prev :: non_neg_integer(),
ram_ack_count_prev :: non_neg_integer(),
ram_bytes :: non_neg_integer(),
out_counter :: non_neg_integer(),
in_counter :: non_neg_integer(),
rates :: rates(),
msgs_on_disk :: gb_sets:set(),
msg_indices_on_disk :: gb_sets:set(),
unconfirmed :: gb_sets:set(),
confirmed :: gb_sets:set(),
ack_out_counter :: non_neg_integer(),
ack_in_counter :: non_neg_integer(),
disk_read_count :: non_neg_integer(),
disk_write_count :: non_neg_integer() }).
%% Duplicated from rabbit_backing_queue
-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
-spec(multiple_routing_keys/0 :: () -> 'ok').
-define(BLANK_DELTA, #delta { start_seq_id = undefined,
count = 0,
end_seq_id = undefined }).
-define(BLANK_DELTA_PATTERN(Z), #delta { start_seq_id = Z,
count = 0,
end_seq_id = Z }).
-define(MICROS_PER_SECOND, 1000000.0).
%% We're sampling every 5s for RAM duration; a half life that is of
%% the same order of magnitude is probably about right.
-define(RATE_AVG_HALF_LIFE, 5.0).
%% We will recalculate the #rates{} every time we get asked for our
%% RAM duration, or every N messages published, whichever is
%% sooner. We do this since the priority calculations in
%% rabbit_amqqueue_process need fairly fresh rates.
-define(MSGS_PER_RATE_CALC, 100).
%% Public API
start(DurableQueues) ->
{AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues),
[Ref || Terms <- AllTerms,
Terms /= non_clean_shutdown,
Ref = proplists:get_value(persistent_ref, Terms),
Ref =/= undefined
{ok, AllTerms}.
stop() ->
ok = stop_msg_store(),
ok = rabbit_queue_index:stop().
start_msg_store(Refs, StartFunState) ->
ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store,
[?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
undefined, {fun (ok) -> finished end, ok}]),
ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store,
[?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
Refs, StartFunState]).
stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
init(Queue, Recover, Callback) ->
Queue, Recover, Callback,
fun (MsgIds, ActionTaken) ->
msgs_written_to_disk(Callback, MsgIds, ActionTaken)
fun (MsgIds) -> msg_indices_written_to_disk(Callback, MsgIds) end,
fun (MsgIds) -> msgs_and_indices_written_to_disk(Callback, MsgIds) end).
init(#amqqueue { name = QueueName, durable = IsDurable }, new,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
IndexState = rabbit_queue_index:init(QueueName,
MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
init(IsDurable, IndexState, 0, 0, [],
case IsDurable of
true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
MsgOnDiskFun, AsyncCallback);
false -> undefined
msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
%% We can be recovering a transient queue if it crashed
init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
{PRef, RecoveryTerms} = process_recovery_terms(Terms),
{PersistentClient, ContainsCheckFun} =
case IsDurable of
true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
MsgOnDiskFun, AsyncCallback),
{C, fun (MsgId) when is_binary(MsgId) ->
rabbit_msg_store:contains(MsgId, C);
(#basic_message{is_persistent = Persistent}) ->
false -> {undefined, fun(_MsgId) -> false end}
TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
undefined, AsyncCallback),
{DeltaCount, DeltaBytes, IndexState} =
QueueName, RecoveryTerms,
ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
PersistentClient, TransientClient).
process_recovery_terms(Terms=non_clean_shutdown) ->
{rabbit_guid:gen(), Terms};
process_recovery_terms(Terms) ->
case proplists:get_value(persistent_ref, Terms) of
undefined -> {rabbit_guid:gen(), []};
PRef -> {PRef, Terms}
terminate(_Reason, State) ->
State1 = #vqstate { persistent_count = PCount,
persistent_bytes = PBytes,
index_state = IndexState,
msg_store_clients = {MSCStateP, MSCStateT} } =
purge_pending_ack(true, State),
PRef = case MSCStateP of
undefined -> undefined;
_ -> ok = rabbit_msg_store:client_terminate(MSCStateP),
ok = rabbit_msg_store:client_delete_and_terminate(MSCStateT),
Terms = [{persistent_ref, PRef},
{persistent_count, PCount},
{persistent_bytes, PBytes}],
a(State1 #vqstate { index_state = rabbit_queue_index:terminate(
Terms, IndexState),
msg_store_clients = undefined }).
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
delete_and_terminate(_Reason, State) ->
%% TODO: there is no need to interact with qi at all - which we do
%% as part of 'purge' and 'purge_pending_ack', other than
%% deleting it.
{_PurgeCount, State1} = purge(State),
State2 = #vqstate { index_state = IndexState,
msg_store_clients = {MSCStateP, MSCStateT} } =
purge_pending_ack(false, State1),
IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState),
case MSCStateP of