Skip to content
Browse files

Updated to RabbitMQ 2.8.1

  • Loading branch information...
1 parent 16838c0 commit be44f23c70caed4b753f4c9eafc00581c0863785 @jbrisbin committed Mar 26, 2012
View
6 README.md
@@ -6,21 +6,21 @@ This is a fork of the rabbit_common dependency, which is needed by the
It's meant to be included in your rebar projects in your rebar.config file:
{deps, [
- {rabbit_common, ".*", {git, "git://github.com/jbrisbin/rabbit_common.git", "rabbitmq_2.7.1"}}
+ {rabbit_common, ".*", {git, "git://github.com/jbrisbin/rabbit_common.git", "rabbitmq_2.8.1"}}
]}.
The "master" branch of this port is a simple re-packaging of the rabbit_common AMQP client dependency.
The "community" branch, however, is a port of the RabbitMQ source code with additional strict compilation
checking turned on and the source code edited to eliminate warnings. It should be 100% compatible with the
unaltered source code. The community branch is simply a tweak to allow projects that depend on rabbit_common
-and also have strict compilation options turned on with this project introducing warnings into those projects.
+to not have to deal with the warnings issued by the compiler in the unaltered RabbitMQ code.
To use the "community" branch in your project, which includes stricter compilation settings, add "-community"
to the version tag:
{deps, [
- {rabbit_common, ".*", {git, "git://github.com/jbrisbin/rabbit_common.git", "rabbitmq_2.7.1-community"}}
+ {rabbit_common, ".*", {git, "git://github.com/jbrisbin/rabbit_common.git", "rabbitmq_2.8.1-community"}}
]}.
### License
View
2 include/gm_specs.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-ifdef(use_specs).
View
17 include/rabbit.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-record(user, {username,
@@ -56,9 +56,11 @@
-record(binding, {source, key, destination, args = []}).
-record(reverse_binding, {destination, key, source, args = []}).
+-record(topic_trie_node, {trie_node, edge_count, binding_count}).
-record(topic_trie_edge, {trie_edge, node_id}).
-record(topic_trie_binding, {trie_binding, value = const}).
+-record(trie_node, {exchange_name, node_id}).
-record(trie_edge, {exchange_name, node_id, word}).
-record(trie_binding, {exchange_name, node_id, destination}).
@@ -84,7 +86,7 @@
%%----------------------------------------------------------------------------
--define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2011 VMware, Inc.").
+-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2012 VMware, Inc.").
-define(INFORMATION_MESSAGE, "Licensed under the MPL. See http://www.rabbitmq.com/").
-define(PROTOCOL_VERSION, "AMQP 0-9-1 / 0-9 / 0-8").
-define(ERTS_MINIMUM, "5.6.3").
@@ -93,16 +95,7 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
+-define(CREDIT_DISC_BOUND, {2000, 500}).
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
-define(DELETED_HEADER, <<"BCC">>).
-
--ifdef(debug).
--define(LOGDEBUG0(F), rabbit_log:debug(F)).
--define(LOGDEBUG(F,A), rabbit_log:debug(F,A)).
--define(LOGMESSAGE(D,C,M,Co), rabbit_log:message(D,C,M,Co)).
--else.
--define(LOGDEBUG0(F), ok).
--define(LOGDEBUG(F,A), ok).
--define(LOGMESSAGE(D,C,M,Co), ok).
--endif.
View
2 include/rabbit_auth_backend_spec.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-ifdef(use_specs).
View
2 include/rabbit_auth_mechanism_spec.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-ifdef(use_specs).
View
10 include/rabbit_backing_queue_spec.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-type(fetch_result(Ack) ::
@@ -25,6 +25,8 @@
-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
-type(duration() :: ('undefined' | 'infinity' | number())).
+-type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok')).
+
-spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(init/3 :: (rabbit_types:amqqueue(), attempt_recovery(),
@@ -42,12 +44,14 @@
rabbit_types:message_properties(), pid(), state())
-> {undefined, state()}).
-spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}).
--spec(dropwhile/2 ::
- (fun ((rabbit_types:message_properties()) -> boolean()), state())
+-spec(dropwhile/3 ::
+ (fun ((rabbit_types:message_properties()) -> boolean()),
+ msg_fun() | 'undefined', state())
-> state()).
-spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()};
(false, state()) -> {fetch_result(undefined), state()}).
-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
+-spec(fold/3 :: (msg_fun(), state(), [ack()]) -> state()).
-spec(requeue/2 :: ([ack()], state())
-> {[rabbit_guid:guid()], state()}).
-spec(len/1 :: (state()) -> non_neg_integer()).
View
2 include/rabbit_exchange_type_spec.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-ifdef(use_specs).
View
2 include/rabbit_framing.hrl
@@ -13,7 +13,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-define(PROTOCOL_PORT, 5672).
-define(FRAME_METHOD, 1).
View
2 include/rabbit_msg_store.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-include("rabbit.hrl").
View
2 include/rabbit_msg_store_index.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-include("rabbit_msg_store.hrl").
View
2 src/gen_server2.erl
@@ -73,7 +73,7 @@
%% but where the second argument is specifically the priority_queue
%% which contains the prioritised message_queue.
-%% All modifications are (C) 2009-2011 VMware, Inc.
+%% All modifications are (C) 2009-2012 VMware, Inc.
%% ``The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
View
25 src/mirrored_supervisor.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-module(mirrored_supervisor).
@@ -144,32 +144,17 @@
-type child() :: pid() | 'undefined'.
-type child_id() :: term().
--type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | 'undefined'}.
-type modules() :: [module()] | 'dynamic'.
--type restart() :: 'permanent' | 'transient' | 'temporary'.
--type shutdown() :: 'brutal_kill' | timeout().
-type worker() :: 'worker' | 'supervisor'.
-type sup_name() :: {'local', Name :: atom()} | {'global', Name :: atom()}.
-type sup_ref() :: (Name :: atom())
| {Name :: atom(), Node :: node()}
| {'global', Name :: atom()}
| pid().
--type child_spec() :: {Id :: child_id(),
- StartFunc :: mfargs(),
- Restart :: restart(),
- Shutdown :: shutdown(),
- Type :: worker(),
- Modules :: modules()}.
-type startlink_err() :: {'already_started', pid()} | 'shutdown' | term().
-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}.
--type startchild_err() :: 'already_present'
- | {'already_started', Child :: child()} | term().
--type startchild_ret() :: {'ok', Child :: child()}
- | {'ok', Child :: child(), Info :: term()}
- | {'error', startchild_err()}.
-
-type group_name() :: any().
-spec start_link(GroupName, Module, Args) -> startlink_ret() when
@@ -183,9 +168,9 @@
Module :: module(),
Args :: term().
--spec start_child(SupRef, ChildSpec) -> startchild_ret() when
+-spec start_child(SupRef, ChildSpec) -> supervisor:startchild_ret() when
SupRef :: sup_ref(),
- ChildSpec :: child_spec() | (List :: [term()]).
+ ChildSpec :: supervisor:child_spec() | (List :: [term()]).
-spec restart_child(SupRef, Id) -> Result when
SupRef :: sup_ref(),
@@ -215,12 +200,12 @@
Modules :: modules().
-spec check_childspecs(ChildSpecs) -> Result when
- ChildSpecs :: [child_spec()],
+ ChildSpecs :: [supervisor:child_spec()],
Result :: 'ok' | {'error', Error :: term()}.
-spec start_internal(Group, ChildSpecs) -> Result when
Group :: group_name(),
- ChildSpecs :: [child_spec()],
+ ChildSpecs :: [supervisor:child_spec()],
Result :: startlink_ret().
-spec create_tables() -> Result when
View
2 src/priority_queue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% Priority queues have essentially the same interface as ordinary
View
151 src/rabbit_amqqueue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_amqqueue).
@@ -20,12 +20,12 @@
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
- stat/1, deliver/2, requeue/3, ack/3, reject/4]).
+ stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
--export([notify_sent/2, unblock/2, flush_all/2]).
+-export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]).
-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-export([store_queue/1]).
@@ -40,21 +40,25 @@
-define(INTEGER_ARG_TYPES, [byte, short, signedint, long]).
+-define(MORE_CONSUMER_CREDIT_AFTER, 50).
+
+-define(FAILOVER_WAIT_MILLIS, 100).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--export_type([name/0, qmsg/0]).
+-export_type([name/0, qmsg/0, routing_result/0]).
-type(name() :: rabbit_types:r('queue')).
-
+-type(qpids() :: [pid()]).
-type(qlen() :: rabbit_types:ok(non_neg_integer())).
-type(qfun(A) :: fun ((rabbit_types:amqqueue()) -> A | no_return())).
-type(qmsg() :: {name(), pid(), msg_id(), boolean(), rabbit_types:message()}).
-type(msg_id() :: non_neg_integer()).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
-
+-type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
-type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found').
-spec(start/0 :: () -> [name()]).
@@ -69,7 +73,8 @@
-> queue_or_not_found() | rabbit_misc:thunk(queue_or_not_found())).
-spec(lookup/1 ::
(name()) -> rabbit_types:ok(rabbit_types:amqqueue()) |
- rabbit_types:error('not_found')).
+ rabbit_types:error('not_found');
+ ([name()]) -> [rabbit_types:amqqueue()]).
-spec(with/2 :: (name(), qfun(A)) -> A | rabbit_types:error('not_found')).
-spec(with_or_die/2 ::
(name(), qfun(A)) -> A | rabbit_types:channel_exit()).
@@ -117,12 +122,15 @@
rabbit_types:error('in_use') |
rabbit_types:error('not_empty')).
-spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()).
--spec(deliver/2 :: (pid(), rabbit_types:delivery()) -> boolean()).
+-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
+ {routing_result(), qpids()}).
+-spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
+ {routing_result(), qpids()}).
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
--spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
--spec(limit_all/3 :: ([pid()], pid(), rabbit_limiter:token()) ->
+-spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()).
+-spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) ->
ok_or_errors()).
-spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), qmsg()} | 'empty').
@@ -133,8 +141,9 @@
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
+-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
--spec(flush_all/2 :: ([pid()], pid()) -> 'ok').
+-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
-spec(internal_delete/1 ::
(name()) -> rabbit_types:ok_or_error('not_found') |
rabbit_types:connection_exit() |
@@ -264,6 +273,10 @@ add_default_binding(#amqqueue{name = QueueName}) ->
key = RoutingKey,
args = []}).
+lookup(Names) when is_list(Names) ->
+ %% Normally we'd call mnesia:dirty_read/1 here, but that is quite
+ %% expensive for reasons explained in rabbit_misc:dirty_read/1.
+ lists:append([ets:lookup(rabbit_queue, Name) || Name <- Names]);
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_queue, Name}).
@@ -323,12 +336,23 @@ check_declare_arguments(QueueName, Args) ->
precondition_failed,
"invalid arg '~s' for ~s: ~255p",
[Key, rabbit_misc:rs(QueueName), Error])
- end || {Key, Fun} <-
+ end ||
+ {Key, Fun} <-
[{<<"x-expires">>, fun check_integer_argument/2},
{<<"x-message-ttl">>, fun check_integer_argument/2},
- {<<"x-ha-policy">>, fun check_ha_policy_argument/2}]],
+ {<<"x-ha-policy">>, fun check_ha_policy_argument/2},
+ {<<"x-dead-letter-exchange">>, fun check_string_argument/2},
+ {<<"x-dead-letter-routing-key">>,
+ fun check_dlxrk_argument/2}]],
ok.
+check_string_argument(undefined, _Args) ->
+ ok;
+check_string_argument({longstr, _}, _Args) ->
+ ok;
+check_string_argument({Type, _}, _) ->
+ {error, {unacceptable_type, Type}}.
+
check_integer_argument(undefined, _Args) ->
ok;
check_integer_argument({Type, Val}, _Args) when Val > 0 ->
@@ -339,6 +363,16 @@ check_integer_argument({Type, Val}, _Args) when Val > 0 ->
check_integer_argument({_Type, Val}, _Args) ->
{error, {value_zero_or_less, Val}}.
+check_dlxrk_argument(undefined, _Args) ->
+ ok;
+check_dlxrk_argument({longstr, _}, Args) ->
+ case rabbit_misc:table_lookup(Args, <<"x-dead-letter-exchange">>) of
+ undefined -> {error, routing_key_but_no_dlx_defined};
+ _ -> ok
+ end;
+check_dlxrk_argument({Type, _}, _Args) ->
+ {error, {unacceptable_type, Type}}.
+
check_ha_policy_argument(undefined, _Args) ->
ok;
check_ha_policy_argument({longstr, <<"all">>}, _Args) ->
@@ -389,9 +423,26 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
+%% We need to account for the idea that queues may be mid-promotion
+%% during force_event_refresh (since it's likely we're doing this in
+%% the first place since a node failed). Therefore we keep poking at
+%% the list of queues until we were able to talk to a live process or
+%% the queue no longer exists.
force_event_refresh() ->
- [gen_server2:cast(Q#amqqueue.pid, force_event_refresh) || Q <- list()],
- ok.
+ force_event_refresh([Q#amqqueue.name || Q <- list()]).
+
+force_event_refresh(QNames) ->
+ Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)],
+ {_, Bad} = rabbit_misc:multi_call(
+ [Q#amqqueue.pid || Q <- Qs], force_event_refresh),
+ FailedPids = [Pid || {Pid, _Reason} <- Bad],
+ Failed = [Name || #amqqueue{name = Name, pid = Pid} <- Qs,
+ lists:member(Pid, FailedPids)],
+ case Failed of
+ [] -> ok;
+ _ -> timer:sleep(?FAILOVER_WAIT_MILLIS),
+ force_event_refresh(Failed)
+ end.
consumers(#amqqueue{ pid = QPid }) ->
delegate_call(QPid, consumers).
@@ -419,14 +470,9 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge).
-deliver(QPid, Delivery = #delivery{immediate = true}) ->
- gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity);
-deliver(QPid, Delivery = #delivery{mandatory = true}) ->
- gen_server2:call(QPid, {deliver, Delivery}, infinity),
- true;
-deliver(QPid, Delivery) ->
- gen_server2:cast(QPid, {deliver, Delivery}),
- true.
+deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow).
+
+deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow).
requeue(QPid, MsgIds, ChPid) ->
delegate_call(QPid, {requeue, MsgIds, ChPid}).
@@ -458,7 +504,21 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
notify_sent(QPid, ChPid) ->
- gen_server2:cast(QPid, {notify_sent, ChPid}).
+ Key = {consumer_credit_to, QPid},
+ put(Key, case get(Key) of
+ 1 -> gen_server2:cast(
+ QPid, {notify_sent, ChPid,
+ ?MORE_CONSUMER_CREDIT_AFTER}),
+ ?MORE_CONSUMER_CREDIT_AFTER;
+ undefined -> erlang:monitor(process, QPid),
+ ?MORE_CONSUMER_CREDIT_AFTER - 1;
+ C -> C - 1
+ end),
+ ok.
+
+notify_sent_queue_down(QPid) ->
+ erase({consumer_credit_to, QPid}),
+ ok.
unblock(QPid, ChPid) ->
delegate_cast(QPid, {unblock, ChPid}).
@@ -518,6 +578,49 @@ pseudo_queue(QueueName, Pid) ->
slave_pids = [],
mirror_nodes = undefined}.
+deliver([], #delivery{mandatory = false, immediate = false}, _Flow) ->
+ %% /dev/null optimisation
+ {routed, []};
+
+deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) ->
+ %% optimisation: when Mandatory = false and Immediate = false,
+ %% rabbit_amqqueue:deliver will deliver the message to the queue
+ %% process asynchronously, and return true, which means all the
+ %% QPids will always be returned. It is therefore safe to use a
+ %% fire-and-forget cast here and return the QPids - the semantics
+ %% is preserved. This scales much better than the non-immediate
+ %% case below.
+ QPids = qpids(Qs),
+ case Flow of
+ flow -> [credit_flow:send(QPid) || QPid <- QPids];
+ noflow -> ok
+ end,
+ delegate:invoke_no_result(
+ QPids, fun (QPid) ->
+ gen_server2:cast(QPid, {deliver, Delivery, Flow})
+ end),
+ {routed, QPids};
+
+deliver(Qs, Delivery = #delivery{mandatory = Mandatory, immediate = Immediate},
+ _Flow) ->
+ QPids = qpids(Qs),
+ {Success, _} =
+ delegate:invoke(
+ QPids, fun (QPid) ->
+ gen_server2:call(QPid, {deliver, Delivery}, infinity)
+ end),
+ case {Mandatory, Immediate,
+ lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]};
+ ({_, false}, {_, H}) -> {true, H}
+ end, {false, []}, Success)} of
+ {true, _ , {false, []}} -> {unroutable, []};
+ {_ , true, {_ , []}} -> {not_delivered, []};
+ {_ , _ , {_ , R}} -> {routed, R}
+ end.
+
+qpids(Qs) -> lists:append([[QPid | SPids] ||
+ #amqqueue{pid = QPid, slave_pids = SPids} <- Qs]).
+
safe_delegate_call_ok(F, Pids) ->
case delegate:invoke(Pids, fun (Pid) ->
rabbit_misc:with_exit_handler(
View
2 src/rabbit_auth_backend.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_backend).
View
2 src/rabbit_auth_mechanism.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_auth_mechanism).
View
16 src/rabbit_backing_queue.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_backing_queue).
@@ -95,16 +95,24 @@ behaviour_info(callbacks) ->
{drain_confirmed, 1},
%% Drop messages from the head of the queue while the supplied
- %% predicate returns true.
- {dropwhile, 2},
+ %% predicate returns true. A callback function is supplied
+ %% allowing callers access to messages that are about to be
+ %% dropped.
+ {dropwhile, 3},
%% Produce the next message.
{fetch, 2},
%% Acktags supplied are for messages which can now be forgotten
- %% about. Must return 1 msg_id per Ack, in the same order as Acks.
+ %% about. Must return 1 msg_id per Ack, in the same order as
+ %% Acks.
{ack, 2},
+ %% Acktags supplied are for messages which should be
+ %% processed. The provided callback function is called with each
+ %% message.
+ {fold, 3},
+
%% Reinsert messages into the queue which have already been
%% delivered and were pending acknowledgement.
{requeue, 2},
View
43 src/rabbit_basic.erl
@@ -11,15 +11,16 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_basic).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
-export([publish/4, publish/6, publish/1,
- message/3, message/4, properties/1, delivery/4]).
+ message/3, message/4, properties/1, append_table_header/3,
+ extract_headers/1, replace_headers/2, delivery/4, header_routes/1]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -29,8 +30,9 @@
-type(properties_input() ::
(rabbit_framing:amqp_property_record() | [{atom(), any()}])).
-type(publish_result() ::
- ({ok, rabbit_router:routing_result(), [pid()]}
+ ({ok, rabbit_amqqueue:routing_result(), [pid()]}
| rabbit_types:error('not_found'))).
+-type(headers() :: rabbit_framing:amqp_table() | 'undefined').
-type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())).
-type(body_input() :: (binary() | [binary()])).
@@ -55,6 +57,17 @@
rabbit_types:ok_or_error2(rabbit_types:message(), any())).
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
+
+-spec(append_table_header/3 ::
+ (binary(), rabbit_framing:amqp_table(), headers()) -> headers()).
+
+-spec(extract_headers/1 :: (rabbit_types:content()) -> headers()).
+
+-spec(replace_headers/2 :: (headers(), rabbit_types:content())
+ -> rabbit_types:content()).
+
+-spec(header_routes/1 ::
+ (undefined | rabbit_framing:amqp_table()) -> [string()]).
-spec(build_content/2 :: (rabbit_framing:amqp_property_record(),
binary() | [binary()]) -> rabbit_types:content()).
-spec(from_content/1 :: (rabbit_types:content()) ->
@@ -88,8 +101,8 @@ publish(Delivery = #delivery{
end.
publish(X, Delivery) ->
- {RoutingRes, DeliveredQPids} =
- rabbit_router:deliver(rabbit_exchange:route(X, Delivery), Delivery),
+ Qs = rabbit_amqqueue:lookup(rabbit_exchange:route(X, Delivery)),
+ {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver(Qs, Delivery),
{ok, RoutingRes, DeliveredQPids}.
delivery(Mandatory, Immediate, Message, MsgSeqNo) ->
@@ -139,7 +152,7 @@ message(XName, RoutingKey, #content{properties = Props} = DecodedContent) ->
{ok, #basic_message{
exchange_name = XName,
content = strip_header(DecodedContent, ?DELETED_HEADER),
- id = rabbit_guid:guid(),
+ id = rabbit_guid:gen(),
is_persistent = is_message_persistent(DecodedContent),
routing_keys = [RoutingKey |
header_routes(Props#'P_basic'.headers)]}}
@@ -166,6 +179,24 @@ properties(P) when is_list(P) ->
end
end, #'P_basic'{}, P).
+append_table_header(Name, Info, undefined) ->
+ append_table_header(Name, Info, []);
+append_table_header(Name, Info, Headers) ->
+ Prior = case rabbit_misc:table_lookup(Headers, Name) of
+ undefined -> [];
+ {array, Existing} -> Existing
+ end,
+ rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]).
+
+extract_headers(Content) ->
+ #content{properties = #'P_basic'{headers = Headers}} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ Headers.
+
+replace_headers(Headers, Content = #content{properties = Props}) ->
+ rabbit_binary_generator:clear_encoded_content(
+ Content#content{properties = Props#'P_basic'{headers = Headers}}).
+
indexof(L, Element) -> indexof(L, Element, 1).
indexof([], _Element, _N) -> 0;
View
2 src/rabbit_binary_generator.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_binary_generator).
View
2 src/rabbit_binary_parser.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_binary_parser).
View
326 src/rabbit_channel.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_channel).
@@ -20,8 +20,8 @@
-behaviour(gen_server2).
--export([start_link/10, do/2, do/3, flush/1, shutdown/1]).
--export([send_command/2, deliver/4, flushed/2, confirm/2]).
+-export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
+-export([send_command/2, deliver/4, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_local/0, ready_for_close/1]).
-export([force_event_refresh/0]).
@@ -33,9 +33,9 @@
-export([list_local/0]).
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
- limiter, tx_status, next_tag,
- unacked_message_q, uncommitted_message_q, uncommitted_acks,
- user, virtual_host, most_recently_declared_queue, queue_monitors,
+ conn_name, limiter, tx_status, next_tag, unacked_message_q,
+ uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
+ virtual_host, most_recently_declared_queue, queue_monitors,
consumer_mapping, blocking, queue_consumers, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
unconfirmed_qm, confirmed, capabilities, trace_state}).
@@ -56,6 +56,7 @@
-define(CREATION_EVENT_KEYS,
[pid,
+ name,
connection,
number,
user,
@@ -71,21 +72,23 @@
-type(channel_number() :: non_neg_integer()).
--spec(start_link/10 ::
- (channel_number(), pid(), pid(), pid(), rabbit_types:protocol(),
- rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
+-spec(start_link/11 ::
+ (channel_number(), pid(), pid(), pid(), string(),
+ rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(),
+ rabbit_framing:amqp_table(),
pid(), rabbit_limiter:token()) -> rabbit_types:ok_pid_or_error()).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
+-spec(do_flow/3 :: (pid(), rabbit_framing:amqp_method_record(),
+ rabbit_types:maybe(rabbit_types:content())) -> 'ok').
-spec(flush/1 :: (pid()) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(deliver/4 ::
(pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg())
-> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
--spec(confirm/2 ::(pid(), [non_neg_integer()]) -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(list_local/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
@@ -101,17 +104,21 @@
%%----------------------------------------------------------------------------
-start_link(Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
- Capabilities, CollectorPid, Limiter) ->
+start_link(Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User,
+ VHost, Capabilities, CollectorPid, Limiter) ->
gen_server2:start_link(
- ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, Protocol, User,
- VHost, Capabilities, CollectorPid, Limiter], []).
+ ?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol,
+ User, VHost, Capabilities, CollectorPid, Limiter], []).
do(Pid, Method) ->
do(Pid, Method, none).
do(Pid, Method, Content) ->
- gen_server2:cast(Pid, {method, Method, Content}).
+ gen_server2:cast(Pid, {method, Method, Content, noflow}).
+
+do_flow(Pid, Method, Content) ->
+ credit_flow:send(Pid),
+ gen_server2:cast(Pid, {method, Method, Content, flow}).
flush(Pid) ->
gen_server2:call(Pid, flush, infinity).
@@ -128,9 +135,6 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) ->
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
-confirm(Pid, MsgSeqNos) ->
- gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}).
-
list() ->
rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(),
rabbit_channel, list_local, []).
@@ -169,7 +173,7 @@ force_event_refresh() ->
%%---------------------------------------------------------------------------
-init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
+init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
Capabilities, CollectorPid, Limiter]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
@@ -179,16 +183,18 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
reader_pid = ReaderPid,
writer_pid = WriterPid,
conn_pid = ConnPid,
+ conn_name = ConnName,
limiter = Limiter,
tx_status = none,
next_tag = 1,
unacked_message_q = queue:new(),
uncommitted_message_q = queue:new(),
uncommitted_acks = [],
+ uncommitted_nacks = [],
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
- queue_monitors = dict:new(),
+ queue_monitors = sets:new(),
consumer_mapping = dict:new(),
blocking = sets:new(),
queue_consumers = dict:new(),
@@ -244,7 +250,12 @@ handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) ->
handle_call(_Request, _From, State) ->
noreply(State).
-handle_cast({method, Method, Content}, State) ->
+handle_cast({method, Method, Content, Flow},
+ State = #ch{reader_pid = Reader}) ->
+ case Flow of
+ flow -> credit_flow:ack(Reader);
+ noflow -> ok
+ end,
try handle_method(Method, Content, State) of
{reply, Reply, NewState} ->
ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
@@ -284,29 +295,19 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
handle_cast({deliver, ConsumerTag, AckRequired,
Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
- routing_keys = [RoutingKey | _CcRoutes],
- content = Content}}},
- State = #ch{writer_pid = WriterPid,
- next_tag = DeliveryTag,
- trace_state = TraceState}) ->
- State1 = lock_message(AckRequired,
- ack_record(DeliveryTag, ConsumerTag, Msg),
- State),
-
- M = #'basic.deliver'{consumer_tag = ConsumerTag,
- delivery_tag = DeliveryTag,
- redelivered = Redelivered,
- exchange = ExchangeName#resource.name,
- routing_key = RoutingKey},
- rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content),
- State2 = maybe_incr_stats([{QPid, 1}], case AckRequired of
- true -> deliver;
- false -> deliver_no_ack
- end, State1),
- State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2),
- rabbit_trace:tap_trace_out(Msg, TraceState),
- noreply(State3#ch{next_tag = DeliveryTag + 1});
-
+ routing_keys = [RoutingKey | _CcRoutes],
+ content = Content}}},
+ State = #ch{writer_pid = WriterPid,
+ next_tag = DeliveryTag}) ->
+ ok = rabbit_writer:send_command_and_notify(
+ WriterPid, QPid, self(),
+ #'basic.deliver'{consumer_tag = ConsumerTag,
+ delivery_tag = DeliveryTag,
+ redelivered = Redelivered,
+ exchange = ExchangeName#resource.name,
+ routing_key = RoutingKey},
+ Content),
+ noreply(record_sent(ConsumerTag, AckRequired, Msg, State));
handle_cast(force_event_refresh, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
@@ -315,6 +316,10 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end).
+handle_info({bump_credit, Msg}, State) ->
+ credit_flow:handle_bump_msg(Msg),
+ noreply(State);
+
handle_info(timeout, State) ->
noreply(State);
@@ -327,9 +332,10 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
State2 = queue_blocked(QPid, State1),
State3 = handle_consuming_queue_down(QPid, State2),
+ credit_flow:peer_down(QPid),
erase_queue_stats(QPid),
noreply(State3#ch{queue_monitors =
- dict:erase(QPid, State3#ch.queue_monitors)});
+ sets:del_element(QPid, State3#ch.queue_monitors)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -527,7 +533,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
#'channel.flow_ok'{active = false});
_ -> ok
end,
- demonitor_queue(QPid, State#ch{blocking = Blocking1})
+ State#ch{blocking = Blocking1}
end.
record_confirm(undefined, _, State) ->
@@ -565,8 +571,7 @@ remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs},
MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
case gb_sets:is_empty(MsgSeqNos1) of
true -> UQM1 = gb_trees:delete(QPid, UQM),
- demonitor_queue(
- QPid, State#ch{unconfirmed_qm = UQM1});
+ State#ch{unconfirmed_qm = UQM1};
false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
State#ch{unconfirmed_qm = UQM1}
end;
@@ -672,45 +677,36 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
State1 = State#ch{unacked_message_q = Remaining},
{noreply,
case TxStatus of
- none -> ack(Acked, State1);
+ none -> ack(Acked, State1),
+ State1;
in_progress -> State1#ch{uncommitted_acks =
Acked ++ State1#ch.uncommitted_acks}
end};
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
- _, State = #ch{writer_pid = WriterPid,
- conn_pid = ConnPid,
- next_tag = DeliveryTag,
- trace_state = TraceState}) ->
+ _, State = #ch{writer_pid = WriterPid,
+ conn_pid = ConnPid,
+ next_tag = DeliveryTag}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
- Msg = {_QName, QPid, _MsgId, Redelivered,
+ Msg = {_QName, _QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
- routing_keys = [RoutingKey | _CcRoutes],
- content = Content}}} ->
- State1 = lock_message(not(NoAck),
- ack_record(DeliveryTag, none, Msg),
- State),
- State2 = maybe_incr_stats([{QPid, 1}], case NoAck of
- true -> get_no_ack;
- false -> get
- end, State1),
- State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2),
- rabbit_trace:tap_trace_out(Msg, TraceState),
+ routing_keys = [RoutingKey | _CcRoutes],
+ content = Content}}} ->
ok = rabbit_writer:send_command(
WriterPid,
- #'basic.get_ok'{delivery_tag = DeliveryTag,
- redelivered = Redelivered,
- exchange = ExchangeName#resource.name,
- routing_key = RoutingKey,
+ #'basic.get_ok'{delivery_tag = DeliveryTag,
+ redelivered = Redelivered,
+ exchange = ExchangeName#resource.name,
+ routing_key = RoutingKey,
message_count = MessageCount},
Content),
- {noreply, State3#ch{next_tag = DeliveryTag + 1}};
+ {noreply, record_sent(none, not(NoAck), Msg, State)};
empty ->
{reply, #'basic.get_empty'{}, State}
end;
@@ -730,7 +726,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
check_read_permitted(QueueName, State),
ActualConsumerTag =
case ConsumerTag of
- <<>> -> rabbit_guid:binstring_guid("amq.ctag");
+ <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
+ "amq.ctag");
Other -> Other
end,
@@ -787,9 +784,8 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
false -> dict:store(QPid, CTags1, QCons)
end
end,
- NewState = demonitor_queue(
- Q, State#ch{consumer_mapping = ConsumerMapping1,
- queue_consumers = QCons1}),
+ NewState = State#ch{consumer_mapping = ConsumerMapping1,
+ queue_consumers = QCons1},
%% In order to ensure that no more messages are sent to
%% the consumer after the cancel_ok has been sent, we get
%% the queue process to send the cancel_ok on our
@@ -960,7 +956,8 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
false -> none
end,
ActualNameBin = case QueueNameBin of
- <<>> -> rabbit_guid:binstring_guid("amq.gen");
+ <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
+ "amq.gen");
Other -> check_name('queue', Other)
end,
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
@@ -1068,19 +1065,26 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel is not transactional", []);
-handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
- uncommitted_acks = TAL}) ->
- State1 = new_tx(ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2,
- State, TMQ))),
- {noreply, maybe_complete_tx(State1#ch{tx_status = committing})};
+handle_method(#'tx.commit'{}, _,
+ State = #ch{uncommitted_message_q = TMQ,
+ uncommitted_acks = TAL,
+ uncommitted_nacks = TNL,
+ limiter = Limiter}) ->
+ State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ),
+ ack(TAL, State1),
+ lists:foreach(
+ fun({Requeue, Acked}) -> reject(Requeue, Acked, Limiter) end, TNL),
+ {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))};
handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel is not transactional", []);
handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
- uncommitted_acks = TAL}) ->
- UAMQ1 = queue:from_list(lists:usort(TAL ++ queue:to_list(UAMQ))),
+ uncommitted_acks = TAL,
+ uncommitted_nacks = TNL}) ->
+ TNL1 = lists:append([L || {_, L} <- TNL]),
+ UAMQ1 = queue:from_list(lists:usort(TAL ++ TNL1 ++ queue:to_list(UAMQ))),
{reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q = UAMQ1})};
handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
@@ -1111,10 +1115,7 @@ handle_method(#'channel.flow'{active = false}, _,
ok = rabbit_limiter:block(Limiter1),
case consumer_queues(Consumers) of
[] -> {reply, #'channel.flow_ok'{active = false}, State1};
- QPids -> State2 = lists:foldl(fun monitor_queue/2,
- State1#ch{blocking =
- sets:from_list(QPids)},
- QPids),
+ QPids -> State2 = State1#ch{blocking = sets:from_list(QPids)},
ok = rabbit_amqqueue:flush_all(QPids, self()),
{noreply, State2}
end;
@@ -1145,31 +1146,12 @@ consumer_monitor(ConsumerTag,
end.
monitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
- case (not dict:is_key(QPid, QMons) andalso
- queue_monitor_needed(QPid, State)) of
- true -> MRef = erlang:monitor(process, QPid),
- State#ch{queue_monitors = dict:store(QPid, MRef, QMons)};
- false -> State
- end.
-
-demonitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
- case (dict:is_key(QPid, QMons) andalso
- not queue_monitor_needed(QPid, State)) of
- true -> true = erlang:demonitor(dict:fetch(QPid, QMons)),
- State#ch{queue_monitors = dict:erase(QPid, QMons)};
- false -> State
+ case sets:is_element(QPid, QMons) of
+ false -> erlang:monitor(process, QPid),
+ State#ch{queue_monitors = sets:add_element(QPid, QMons)};
+ true -> State
end.
-queue_monitor_needed(QPid, #ch{queue_consumers = QCons,
- blocking = Blocking,
- unconfirmed_qm = UQM} = State) ->
- StatsEnabled = rabbit_event:stats_level(
- State, #ch.stats_timer) =:= fine,
- ConsumerMonitored = dict:is_key(QPid, QCons),
- QueueBlocked = sets:is_element(QPid, Blocking),
- ConfirmMonitored = gb_trees:is_defined(QPid, UQM),
- StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored.
-
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
{value, MsgSet} -> gb_sets:to_list(MsgSet);
@@ -1180,14 +1162,9 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
%% the set one by one which which would be inefficient
State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
{Nack, SendFun} =
- case Reason of
- Reason when Reason =:= noproc; Reason =:= noconnection;
- Reason =:= normal; Reason =:= shutdown ->
- {false, fun record_confirms/2};
- {shutdown, _} ->
- {false, fun record_confirms/2};
- _ ->
- {true, fun send_nacks/2}
+ case rabbit_misc:is_abnormal_termination(Reason) of
+ true -> {true, fun send_nacks/2};
+ false -> {false, fun record_confirms/2}
end,
{MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1),
SendFun(MXs, State2).
@@ -1266,18 +1243,46 @@ basic_return(#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey},
Content).
-reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) ->
+reject(DeliveryTag, Requeue, Multiple,
+ State = #ch{unacked_message_q = UAMQ, tx_status = TxStatus}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
+ State1 = State#ch{unacked_message_q = Remaining},
+ {noreply,
+ case TxStatus of
+ none ->
+ reject(Requeue, Acked, State1#ch.limiter),
+ State1;
+ in_progress ->
+ State1#ch{uncommitted_nacks =
+ [{Requeue, Acked} | State1#ch.uncommitted_nacks]}
+ end}.
+
+reject(Requeue, Acked, Limiter) ->
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
end, ok, Acked),
- ok = notify_limiter(State#ch.limiter, Acked),
- {noreply, State#ch{unacked_message_q = Remaining}}.
-
-ack_record(DeliveryTag, ConsumerTag,
- _MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) ->
- {DeliveryTag, ConsumerTag, {QPid, MsgId}}.
+ ok = notify_limiter(Limiter, Acked).
+
+record_sent(ConsumerTag, AckRequired,
+ Msg = {_QName, QPid, MsgId, Redelivered, _Message},
+ State = #ch{unacked_message_q = UAMQ,
+ next_tag = DeliveryTag,
+ trace_state = TraceState}) ->
+ maybe_incr_stats([{QPid, 1}], case {ConsumerTag, AckRequired} of
+ {none, true} -> get;
+ {none, false} -> get_no_ack;
+ {_ , true} -> deliver;
+ {_ , false} -> deliver_no_ack
+ end, State),
+ maybe_incr_redeliver_stats(Redelivered, QPid, State),
+ rabbit_trace:tap_trace_out(Msg, TraceState),
+ UAMQ1 = case AckRequired of
+ true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}},
+ UAMQ);
+ false -> UAMQ
+ end,
+ State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}.
collect_acks(Q, 0, true) ->
{queue:to_list(Q), queue:new()};
@@ -1312,7 +1317,8 @@ ack(Acked, State) ->
maybe_incr_stats(QIncs, ack, State).
new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
- uncommitted_acks = []}.
+ uncommitted_acks = [],
+ uncommitted_nacks = []}.
notify_queues(State = #ch{state = closing}) ->
{ok, State};
@@ -1362,22 +1368,25 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
msg_seq_no = MsgSeqNo},
QNames}, State) ->
- {RoutingRes, DeliveredQPids} = rabbit_router:deliver(QNames, Delivery),
- State1 = process_routing_result(RoutingRes, DeliveredQPids,
- XName, MsgSeqNo, Message, State),
+ {RoutingRes, DeliveredQPids} =
+ rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery),
+ State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids),
+ State2 = process_routing_result(RoutingRes, DeliveredQPids,
+ XName, MsgSeqNo, Message, State1),
maybe_incr_stats([{XName, 1} |
[{{QPid, XName}, 1} ||
- QPid <- DeliveredQPids]], publish, State1).
+ QPid <- DeliveredQPids]], publish, State2),
+ State2.
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
- record_confirm(MsgSeqNo, XName,
- maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
- return_unroutable, State));
+ maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
+ return_unroutable, State),
+ record_confirm(MsgSeqNo, XName, State);
process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_consumers),
- record_confirm(MsgSeqNo, XName,
- maybe_incr_stats([{XName, 1}], return_not_delivered, State));
+ maybe_incr_stats([{XName, 1}], return_not_delivered, State),
+ record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, _, _, undefined, _, State) ->
@@ -1395,15 +1404,10 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
State0#ch{unconfirmed_qm = UQM1};
none ->
UQM1 = gb_trees:insert(QPid, SingletonSet, UQM),
- monitor_queue(QPid, State0#ch{unconfirmed_qm = UQM1})
+ State0#ch{unconfirmed_qm = UQM1}
end
end, State#ch{unconfirmed_mq = UMQ1}, QPids).
-lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
- State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
-lock_message(false, _MsgStruct, State) ->
- State.
-
send_nacks([], State) ->
State;
send_nacks(MXs, State = #ch{tx_status = none}) ->
@@ -1419,13 +1423,12 @@ send_nacks(_, State) ->
send_confirms(State = #ch{tx_status = none, confirmed = []}) ->
State;
send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
- {MsgSeqNos, State1} =
- lists:foldl(fun ({MsgSeqNo, ExchangeName}, {MSNs, State0}) ->
- {[MsgSeqNo | MSNs],
- maybe_incr_stats([{ExchangeName, 1}], confirm,
- State0)}
- end, {[], State}, lists:append(C)),
- send_confirms(MsgSeqNos, State1 #ch{confirmed = []});
+ MsgSeqNos =
+ lists:foldl(fun ({MsgSeqNo, XName}, MSNs) ->
+ maybe_incr_stats([{XName, 1}], confirm, State),
+ [MsgSeqNo | MSNs]
+ end, [], lists:append(C)),
+ send_confirms(MsgSeqNos, State#ch{confirmed = []});
send_confirms(State) ->
maybe_complete_tx(State).
@@ -1486,6 +1489,7 @@ i(user, #ch{user = User}) -> User#user.username;
i(vhost, #ch{virtual_host = VHost}) -> VHost;
i(transactional, #ch{tx_status = TE}) -> TE =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
+i(name, State) -> name(State);
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) ->
@@ -1503,28 +1507,26 @@ i(client_flow_blocked, #ch{limiter = Limiter}) ->
i(Item, _) ->
throw({bad_argument, Item}).
+name(#ch{conn_name = ConnName, channel = Channel}) ->
+ list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])).
+
maybe_incr_redeliver_stats(true, QPid, State) ->
maybe_incr_stats([{QPid, 1}], redeliver, State);
-maybe_incr_redeliver_stats(_, _, State) ->
- State.
+maybe_incr_redeliver_stats(_, _, _State) ->
+ ok.
maybe_incr_stats(QXIncs, Measure, State) ->
case rabbit_event:stats_level(State, #ch.stats_timer) of
- fine -> lists:foldl(fun ({QX, Inc}, State0) ->
- incr_stats(QX, Inc, Measure, State0)
- end, State, QXIncs);
- _ -> State
+ fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs];
+ _ -> ok
end.
-incr_stats({QPid, _} = QX, Inc, Measure, State) ->
- update_measures(queue_exchange_stats, QX, Inc, Measure),
- monitor_queue(QPid, State);
-incr_stats(QPid, Inc, Measure, State) when is_pid(QPid) ->
- update_measures(queue_stats, QPid, Inc, Measure),
- monitor_queue(QPid, State);
-incr_stats(X, Inc, Measure, State) ->
- update_measures(exchange_stats, X, Inc, Measure),
- State.
+incr_stats({_, _} = QX, Inc, Measure) ->
+ update_measures(queue_exchange_stats, QX, Inc, Measure);
+incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
+ update_measures(queue_stats, QPid, Inc, Measure);
+incr_stats(X, Inc, Measure) ->
+ update_measures(exchange_stats, X, Inc, Measure).
update_measures(Type, QX, Inc, Measure) ->
Measures = case get({Type, QX}) of
View
2 src/rabbit_command_assembler.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_command_assembler).
View
2 src/rabbit_common.app.src
@@ -1,6 +1,6 @@
{application, rabbit_common,
[{description, "RabbitMQ Common Libraries"},
- {vsn, "2.7.1"},
+ {vsn, "2.8.1"},
{modules, []},
{registered, []},
{env, []},
View
2 src/rabbit_exchange_type.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_exchange_type).
View
110 src/rabbit_framing_amqp_0_8.erl
@@ -13,7 +13,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_framing_amqp_0_8).
-include("rabbit_framing.hrl").
@@ -136,60 +136,60 @@
| #'test.string_ok'{} | #'test.table'{} | #'test.table_ok'{} | #'test.content'{}
| #'test.content_ok'{} )).
-type(amqp_method_field_name() ::
- ( consumer_tag | prefetch_count | message_count | exclusive
- | delivery_tag | string_2 | immediate | cluster_id
- | ticket | content_checksum | ticket | no_local
- | routing_key | frame_max | routing_key | out_of_band
- | routing_key | durable | if_empty | exchange
- | nowait | queue | routing_key | nowait
- | exchange | routing_key | result | ticket
- | routing_key | ticket | requeue | active
- | channel_max | exchange | delivery_tag | queue
- | consumer_tag | integer_3 | operation | consume_rate
- | reply_text | consumer_tag | consumer_count | nowait
- | global | reply_text | nowait | redelivered
- | mandatory | operation | message_count | auto_delete
- | nowait | channel_max | string_op | message_count
- | method_id | delivery_tag | write | mandatory
- | integer_4 | ticket | integer_result | nowait
- | consumer_tag | nowait | nowait | prefetch_size
- | details | passive | ticket | no_local
- | meta_data | ticket | class_id | reply_code
- | ticket | consumer_tag | known_hosts | routing_key
- | exchange | frame_max | exclusive | reply_code
- | response | global | consumer_tag | exclusive
- | delivery_tag | read | no_ack | delivery_tag
- | reply_code | exchange | version_major | if_unused
- | reply_code | identifier | requeue | exclusive
- | active | exclusive | nowait | consumer_tag
- | staged_size | method_id | immediate | realm
- | prefetch_count | type | ticket | routing_key
- | heartbeat | arguments | content_size | ticket
- | passive | reply_code | integer_1 | consumer_tag
- | mechanism | routing_key | table | reply_text
- | delivery_tag | arguments | queue | version_minor
- | consumer_tag | identifier | identifier | reply_code
- | client_properties | ticket | auto_delete | reply_text
- | ticket | queue | arguments | insist
- | host | reply_text | multiple | routing_key
- | ticket | queue | redelivered | delivery_tag
- | durable | response | nowait | exchange
- | string_result | server_properties | routing_key | consumer_tag
- | prefetch_size | message_count | reply_text | immediate
- | arguments | exchange | active | ticket
- | no_ack | integer_2 | no_local | nowait
- | no_ack | consumer_tag | mandatory | integer_op
- | queue | capabilities | requeue | prefetch_size
- | result | exchange | exchange | queue
- | challenge | exchange | locale | exchange
- | redelivered | nowait | requeue | known_hosts
- | queue | consumer_tag | mechanisms | ticket
- | exchange | class_id | heartbeat | exchange
- | exchange | multiple | queue | queue
- | locales | string_1 | consumer_tag | passive
- | if_unused | virtual_host | delivery_tag | prefetch_count
- | queue | internal | consumer_tag | global
- | consumer_tag | dtx_identifier )).
+ ( challenge | ticket | mandatory | routing_key
+ | string_1 | nowait | redelivered | operation
+ | requeue | queue | global | queue
+ | no_ack | nowait | reply_text | internal
+ | delivery_tag | mandatory | consumer_count | integer_3
+ | exclusive | consumer_tag | consumer_tag | routing_key
+ | message_count | prefetch_size | channel_max | exchange
+ | ticket | meta_data | exchange | ticket
+ | frame_max | string_op | consumer_tag | read
+ | durable | nowait | ticket | reply_code
+ | integer_4 | global | details | exclusive
+ | delivery_tag | no_local | arguments | immediate
+ | auto_delete | exchange | consumer_tag | delivery_tag
+ | known_hosts | method_id | requeue | frame_max
+ | exclusive | response | exchange | staged_size
+ | multiple | client_properties | immediate | version_minor
+ | operation | locale | ticket | routing_key
+ | nowait | version_major | ticket | identifier
+ | active | consumer_tag | queue | active
+ | routing_key | table | queue | nowait
+ | channel_max | queue | queue | prefetch_count
+ | identifier | identifier | queue | heartbeat
+ | class_id | reply_text | passive | nowait
+ | exchange | if_unused | ticket | redelivered
+ | reply_text | routing_key | realm | response
+ | reply_code | delivery_tag | reply_code | nowait
+ | ticket | passive | no_local | integer_result
+ | arguments | consumer_tag | host | routing_key
+ | message_count | ticket | exchange | global
+ | ticket | reply_text | no_ack | integer_2
+ | nowait | ticket | mandatory | integer_op
+ | result | exchange | requeue | prefetch_size
+ | consumer_tag | exclusive | content_size | reply_text
+ | mechanism | exchange | exchange | active
+ | reply_code | no_local | server_properties | result
+ | consumer_tag | known_hosts | ticket | exchange
+ | dtx_identifier | consumer_tag | virtual_host | type
+ | exchange | reply_text | multiple | auto_delete
+ | nowait | reply_code | arguments | consumer_tag
+ | mechanisms | arguments | delivery_tag | prefetch_size
+ | consumer_tag | class_id | consumer_tag | message_count
+ | exchange | queue | exclusive | delivery_tag
+ | queue | string_2 | immediate | cluster_id
+ | passive | nowait | content_checksum | prefetch_count
+ | if_unused | string_result | consumer_tag | heartbeat
+ | queue | integer_1 | routing_key | consumer_tag
+ | routing_key | ticket | exchange | if_empty
+ | prefetch_count | exchange | ticket | locales
+ | queue | routing_key | method_id | message_count
+ | ticket | routing_key | reply_code | requeue
+ | out_of_band | write | delivery_tag | durable
+ | delivery_tag | insist | capabilities | redelivered
+ | consume_rate | no_ack | routing_key | nowait
+ | consumer_tag | nowait )).
-type(amqp_property_record() ::
( #'P_connection'{} | #'P_channel'{} | #'P_access'{} | #'P_exchange'{}
| #'P_queue'{} | #'P_basic'{} | #'P_file'{} | #'P_stream'{}
View
76 src/rabbit_framing_amqp_0_9_1.erl
@@ -13,7 +13,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_framing_amqp_0_9_1).
-include("rabbit_framing.hrl").
@@ -115,43 +115,43 @@
| #'tx.commit'{} | #'tx.commit_ok'{} | #'tx.rollback'{} | #'tx.rollback_ok'{}
| #'confirm.select'{} | #'confirm.select_ok'{} )).
-type(amqp_method_field_name() ::
- ( passive | message_count | nowait | message_count
- | queue | active | consumer_tag | routing_key
- | mechanisms | no_ack | arguments | nowait
- | prefetch_count | channel_max | if_empty | ticket
- | nowait | message_count | requeue | nowait
- | locale | active | method_id | queue
- | ticket | ticket | ticket | prefetch_size
- | mandatory | reply_code | nowait | known_hosts
- | server_properties | nowait | frame_max | no_local
- | response | if_unused | arguments | write
- | response | exclusive | nowait | nowait
- | global | requeue | arguments | requeue
- | queue | active | arguments | class_id
- | exchange | arguments | heartbeat | ticket
- | queue | routing_key | queue | consumer_tag
- | type | queue | version_major | reply_text
- | multiple | routing_key | arguments | mechanism
- | redelivered | virtual_host | read | queue
- | message_count | delivery_tag | out_of_band | delivery_tag
- | reply_code | durable | ticket | channel_max
- | method_id | consumer_tag | passive | no_ack
- | version_minor | nowait | consumer_tag | nowait
- | destination | client_properties | reply_code | nowait
- | delivery_tag | insist | exchange | frame_max
- | ticket | ticket | exchange | reply_text
- | passive | immediate | ticket | durable
- | destination | ticket | realm | arguments
- | reply_text | ticket | delivery_tag | channel_id
- | delivery_tag | cluster_id | class_id | source
- | queue | capabilities | exchange | exchange
- | challenge | auto_delete | exchange | consumer_count
- | routing_key | exclusive | exchange | consumer_tag
- | routing_key | heartbeat | routing_key | exclusive
- | exchange | locales | source | routing_key
- | auto_delete | ticket | if_unused | internal
- | ticket | routing_key | redelivered | requeue
- | multiple )).
+ ( nowait | source | routing_key | ticket
+ | arguments | exchange | reply_text | ticket
+ | heartbeat | consumer_tag | queue | source
+ | exchange | redelivered | requeue | type
+ | multiple | version_major | locales | challenge
+ | arguments | ticket | method_id | global
+ | no_ack | out_of_band | arguments | consumer_tag
+ | reply_code | delivery_tag | ticket | message_count
+ | message_count | channel_max | response | ticket
+ | version_minor | queue | prefetch_count | consumer_tag
+ | mechanisms | queue | ticket | queue
+ | ticket | exclusive | reply_code | delivery_tag
+ | delivery_tag | requeue | ticket | locale
+ | routing_key | virtual_host | frame_max | queue
+ | consumer_count | durable | message_count | ticket
+ | exclusive | client_properties | realm | mandatory
+ | requeue | passive | server_properties | queue
+ | channel_id | reply_text | arguments | destination
+ | capabilities | passive | if_unused | heartbeat
+ | queue | consumer_tag | auto_delete | arguments
+ | reply_text | multiple | exchange | routing_key
+ | redelivered | exclusive | routing_key | durable
+ | nowait | ticket | ticket | method_id
+ | reply_code | nowait | routing_key | immediate
+ | cluster_id | if_unused | mechanism | no_ack
+ | internal | routing_key | message_count | passive
+ | destination | class_id | response | if_empty
+ | exchange | active | read | ticket
+ | exchange | routing_key | requeue | exchange
+ | nowait | nowait | consumer_tag | channel_max
+ | nowait | exchange | nowait | active
+ | insist | arguments | auto_delete | queue
+ | delivery_tag | delivery_tag | prefetch_size | nowait
+ | nowait | class_id | known_hosts | frame_max
+ | active | nowait | nowait | no_local
+ | arguments | ticket | exchange | write
+ | routing_key )).
-type(amqp_property_record() ::
( #'P_connection'{} | #'P_channel'{} | #'P_access'{} | #'P_exchange'{}
| #'P_queue'{} | #'P_basic'{} | #'P_tx'{} | #'P_confirm'{} )).
View
2 src/rabbit_heartbeat.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_heartbeat).
View
135 src/rabbit_misc.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(rabbit_misc).
@@ -28,23 +28,26 @@
-export([enable_cover/0, report_cover/0]).
-export([enable_cover/1, report_cover/1]).
-export([start_cover/1]).
+-export([confirm_to_sender/2]).
-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
+-export([is_abnormal_termination/1]).
-export([with_user/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
-export([execute_mnesia_transaction/2]).
-export([execute_mnesia_tx_with_tail/1]).
-export([ensure_ok/2]).
--export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]).
+-export([tcp_name/3]).
-export([upmap/2, map_in_order/2]).
-export([table_filter/3]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
--export([format_stderr/2, with_local_io/1, local_info_msg/2]).
+-export([format/2, format_stderr/2, with_local_io/1, local_info_msg/2]).
-export([start_applications/1, stop_applications/1]).
-export([unfold/2, ceil/1, queue_fold/3]).
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
--export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
+-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3,
+ gb_trees_set_insert/3]).
-export([gb_trees_fold/3, gb_trees_foreach/2]).
-export([get_options/2]).
-export([all_module_attributes/1, build_acyclic_graph/3]).
@@ -55,6 +58,7 @@
-export([pget/2, pget/3, pget_or_die/2]).
-export([format_message_queue/2]).
-export([append_rpc_all_nodes/4]).
+-export([multi_call/2]).
-export([quit/1]).
%%----------------------------------------------------------------------------
@@ -108,7 +112,6 @@
(rabbit_framing:amqp_table(), binary(),
rabbit_framing:amqp_field_type(), rabbit_framing:amqp_value())
-> rabbit_framing:amqp_table()).
-
-spec(r/2 :: (rabbit_types:vhost(), K)
-> rabbit_types:r3(rabbit_types:vhost(), K, '_')
when is_subtype(K, atom())).
@@ -131,6 +134,7 @@
(atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A).
-spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A).
-spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]).
+-spec(is_abnormal_termination/1 :: (any()) -> boolean()).
-spec(with_user/2 :: (rabbit_types:username(), thunk(A)) -> A).
-spec(with_user_and_vhost/3 ::
(rabbit_types:username(), rabbit_types:vhost(), thunk(A))
@@ -141,9 +145,6 @@
-spec(execute_mnesia_tx_with_tail/1 ::
(thunk(fun ((boolean()) -> B))) -> B | (fun ((boolean()) -> B))).
-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok').
--spec(makenode/1 :: ({string(), string()} | string()) -> node()).
--spec(nodeparts/1 :: (node() | string()) -> {string(), string()}).
--spec(cookie_hash/0 :: () -> string()).
-spec(tcp_name/3 ::
(atom(), inet:ip_address(), rabbit_networking:ip_port())
-> atom()).
@@ -155,6 +156,7 @@
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom())
-> 'ok' | 'aborted').
-spec(dirty_dump_log/1 :: (file:filename()) -> ok_or_error()).
+-spec(format/2 :: (string(), [any()]) -> string()).
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(with_local_io/1 :: (fun (() -> A)) -> A).
-spec(local_info_msg/2 :: (string(), [any()]) -> 'ok').
@@ -174,6 +176,7 @@
-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
-spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()).
-spec(gb_trees_cons/3 :: (any(), any(), gb_tree()) -> gb_tree()).
+-spec(gb_trees_set_insert/3 :: (any(), any(), gb_tree()) -> gb_tree()).
-spec(gb_trees_fold/3 :: (fun ((any(), any(), A) -> A), A, gb_tree()) -> A).