Skip to content
Browse files

Updated to 2.6.0

  • Loading branch information...
1 parent 15ad310 commit 93e687d65c23fd85a768554f368bf8d747addc95 @jbrisbin committed Aug 31, 2011
View
13 include/rabbit.hrl
@@ -15,12 +15,12 @@
%%
-record(user, {username,
- is_admin,
+ tags,
auth_backend, %% Module this user came from
impl %% Scratch space for that module
}).
--record(internal_user, {username, password_hash, is_admin}).
+-record(internal_user, {username, password_hash, tags}).
-record(permission, {configure, write, read}).
-record(user_vhost, {username, virtual_host}).
-record(user_permission, {user_vhost, permission}).
@@ -42,11 +42,12 @@
-record(resource, {virtual_host, kind, name}).
--record(exchange, {name, type, durable, auto_delete, internal, arguments}).
+-record(exchange, {name, type, durable, auto_delete, internal, arguments,
+ scratch}).
-record(exchange_serial, {name, next}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
- arguments, pid}).
+ arguments, pid, slave_pids, mirror_nodes}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
@@ -67,8 +68,7 @@
is_persistent}).
-record(ssl_socket, {tcp, ssl}).
--record(delivery, {mandatory, immediate, txn, sender, message,
- msg_seq_no}).
+-record(delivery, {mandatory, immediate, sender, message, msg_seq_no}).
-record(amqp_error, {name, explanation = "", method = none}).
-record(event, {type, props, timestamp}).
@@ -86,7 +86,6 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
--define(STATS_INTERVAL, 5000).
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
-define(DELETED_HEADER, <<"BCC">>).
View
3 include/rabbit_auth_backend_spec.hrl
@@ -22,8 +22,7 @@
{'ok', rabbit_types:user()} |
{'refused', string(), [any()]} |
{'error', any()}).
--spec(check_vhost_access/3 :: (rabbit_types:user(), rabbit_types:vhost(),
- rabbit_access_control:vhost_permission_atom()) ->
+-spec(check_vhost_access/2 :: (rabbit_types:user(), rabbit_types:vhost()) ->
boolean() | {'error', any()}).
-spec(check_resource_access/3 :: (rabbit_types:user(),
rabbit_types:r(atom()),
View
17 include/rabbit_backing_queue_spec.hrl
@@ -26,12 +26,11 @@
fun ((rabbit_types:message_properties())
-> rabbit_types:message_properties())).
-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
--type(sync_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok' | 'error')).
-spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(init/4 :: (rabbit_types:amqqueue(), attempt_recovery(),
- async_callback(), sync_callback()) -> state()).
+-spec(init/3 :: (rabbit_types:amqqueue(), attempt_recovery(),
+ async_callback()) -> state()).
-spec(terminate/2 :: (any(), state()) -> state()).
-spec(delete_and_terminate/2 :: (any(), state()) -> state()).
-spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
@@ -51,14 +50,6 @@
-spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()};
(false, state()) -> {fetch_result(undefined), state()}).
-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
--spec(tx_publish/5 :: (rabbit_types:txn(), rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state()) ->
- state()).
--spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()).
--spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}).
--spec(tx_commit/4 ::
- (rabbit_types:txn(), fun (() -> any()),
- message_properties_transformer(), state()) -> {[ack()], state()}).
-spec(requeue/3 :: ([ack()], message_properties_transformer(), state())
-> {[rabbit_guid:guid()], state()}).
-spec(len/1 :: (state()) -> non_neg_integer()).
@@ -71,7 +62,7 @@
-spec(handle_pre_hibernate/1 :: (state()) -> state()).
-spec(status/1 :: (state()) -> [{atom(), any()}]).
-spec(invoke/3 :: (atom(), fun ((atom(), A) -> A), state()) -> state()).
--spec(is_duplicate/3 ::
- (rabbit_types:txn(), rabbit_types:basic_message(), state()) ->
+-spec(is_duplicate/2 ::
+ (rabbit_types:basic_message(), state()) ->
{'false'|'published'|'discarded', state()}).
-spec(discard/3 :: (rabbit_types:basic_message(), pid(), state()) -> state()).
View
92 src/gen_server2.erl
@@ -67,6 +67,11 @@
%% module. Note there is no form also encompassing a reply, thus if
%% you wish to reply in handle_call/3 and change the callback module,
%% you need to use gen_server2:reply/2 to issue the reply manually.
+%%
+%% 8) The callback module can optionally implement
+%% format_message_queue/2 which is the equivalent of format_status/2
+%% but where the second argument is specifically the priority_queue
+%% which contains the prioritised message_queue.
%% All modifications are (C) 2009-2011 VMware, Inc.
@@ -593,41 +598,35 @@ adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO,
CurrentTO1 = Base + Extra,
{backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, RandomState1}.
-in({'$gen_cast', Msg}, GS2State = #gs2_state { prioritise_cast = PC,
- queue = Queue }) ->
- GS2State #gs2_state { queue = priority_queue:in(
- {'$gen_cast', Msg},
- PC(Msg, GS2State), Queue) };
-in({'$gen_call', From, Msg}, GS2State = #gs2_state { prioritise_call = PC,
- queue = Queue }) ->
- GS2State #gs2_state { queue = priority_queue:in(
- {'$gen_call', From, Msg},
- PC(Msg, From, GS2State), Queue) };
-in(Input, GS2State = #gs2_state { prioritise_info = PI, queue = Queue }) ->
- GS2State #gs2_state { queue = priority_queue:in(
- Input, PI(Input, GS2State), Queue) }.
-
-process_msg(Msg,
- GS2State = #gs2_state { parent = Parent,
- name = Name,
- debug = Debug }) ->
- case Msg of
- {system, From, Req} ->
- sys:handle_system_msg(
- Req, From, Parent, ?MODULE, Debug,
- GS2State);
- %% gen_server puts Hib on the end as the 7th arg, but that
- %% version of the function seems not to be documented so
- %% leaving out for now.
- {'EXIT', Parent, Reason} ->
- terminate(Reason, Msg, GS2State);
- _Msg when Debug =:= [] ->
- handle_msg(Msg, GS2State);
- _Msg ->
- Debug1 = sys:handle_debug(Debug, fun print_event/3,
- Name, {in, Msg}),
- handle_msg(Msg, GS2State #gs2_state { debug = Debug1 })
- end.
+in({'$gen_cast', Msg} = Input,
+ GS2State = #gs2_state { prioritise_cast = PC }) ->
+ in(Input, PC(Msg, GS2State), GS2State);
+in({'$gen_call', From, Msg} = Input,
+ GS2State = #gs2_state { prioritise_call = PC }) ->
+ in(Input, PC(Msg, From, GS2State), GS2State);
+in({'EXIT', Parent, _R} = Input, GS2State = #gs2_state { parent = Parent }) ->
+ in(Input, infinity, GS2State);
+in({system, _From, _Req} = Input, GS2State) ->
+ in(Input, infinity, GS2State);
+in(Input, GS2State = #gs2_state { prioritise_info = PI }) ->
+ in(Input, PI(Input, GS2State), GS2State).
+
+in(Input, Priority, GS2State = #gs2_state { queue = Queue }) ->
+ GS2State # gs2_state { queue = priority_queue:in(Input, Priority, Queue) }.
+
+process_msg({system, From, Req},
+ GS2State = #gs2_state { parent = Parent, debug = Debug }) ->
+ %% gen_server puts Hib on the end as the 7th arg, but that version
+ %% of the fun seems not to be documented so leaving out for now.
+ sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, GS2State);
+process_msg({'EXIT', Parent, Reason} = Msg,
+ GS2State = #gs2_state { parent = Parent }) ->
+ terminate(Reason, Msg, GS2State);
+process_msg(Msg, GS2State = #gs2_state { debug = [] }) ->
+ handle_msg(Msg, GS2State);
+process_msg(Msg, GS2State = #gs2_state { name = Name, debug = Debug }) ->
+ Debug1 = sys:handle_debug(Debug, fun print_event/3, Name, {in, Msg}),
+ handle_msg(Msg, GS2State #gs2_state { debug = Debug1 }).
%%% ---------------------------------------------------
%%% Send/recive functions
@@ -1161,17 +1160,22 @@ format_status(Opt, StatusData) ->
end,
Header = lists:concat(["Status for generic server ", NameTag]),
Log = sys:get_debug(log, Debug, []),
- Specfic =
- case erlang:function_exported(Mod, format_status, 2) of
- true -> case catch Mod:format_status(Opt, [PDict, State]) of
- {'EXIT', _} -> [{data, [{"State", State}]}];
- Else -> Else
- end;
- _ -> [{data, [{"State", State}]}]
- end,
+ Specfic = callback(Mod, format_status, [Opt, [PDict, State]],
+ fun () -> [{data, [{"State", State}]}] end),
+ Messages = callback(Mod, format_message_queue, [Opt, Queue],
+ fun () -> priority_queue:to_list(Queue) end),
[{header, Header},
{data, [{"Status", SysState},
{"Parent", Parent},
{"Logged events", Log},
- {"Queued messages", priority_queue:to_list(Queue)}]} |
+ {"Queued messages", Messages}]} |
Specfic].
+
+callback(Mod, FunName, Args, DefaultThunk) ->
+ case erlang:function_exported(Mod, FunName, length(Args)) of
+ true -> case catch apply(Mod, FunName, Args) of
+ {'EXIT', _} -> DefaultThunk();
+ Success -> Success
+ end;
+ false -> DefaultThunk()
+ end.
View
194 src/rabbit_amqqueue.erl
@@ -20,20 +20,20 @@
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
- stat/1, deliver/2, requeue/3, ack/4, reject/4]).
--export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
+ stat/1, deliver/2, requeue/3, ack/3, reject/4]).
+-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
+-export([force_event_refresh/0]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
-export([notify_sent/2, unblock/2, flush_all/2]).
--export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]).
+-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
+-export([store_queue/1]).
+
%% internal
--export([internal_declare/2, internal_delete/1,
- run_backing_queue/3, run_backing_queue_async/3,
- sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2,
- set_maximum_since_use/2, maybe_expire/1, drop_expired/1,
- emit_stats/1]).
+-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
+ set_ram_duration_target/2, set_maximum_since_use/2]).
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
@@ -80,6 +80,7 @@
-> 'ok' | rabbit_types:channel_exit()).
-spec(with_exclusive_access_or_die/3 ::
(name(), pid(), qfun(A)) -> A | rabbit_types:channel_exit()).
+-spec(list/0 :: () -> [rabbit_types:amqqueue()]).
-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:amqqueue()]).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (rabbit_types:amqqueue()) -> rabbit_types:infos()).
@@ -89,6 +90,7 @@
-spec(info_all/1 :: (rabbit_types:vhost()) -> [rabbit_types:infos()]).
-spec(info_all/2 :: (rabbit_types:vhost(), rabbit_types:info_keys())
-> [rabbit_types:infos()]).
+-spec(force_event_refresh/0 :: () -> 'ok').
-spec(consumers/1 ::
(rabbit_types:amqqueue())
-> [{pid(), rabbit_types:ctag(), boolean()}]).
@@ -99,7 +101,6 @@
-spec(stat/1 ::
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
--spec(emit_stats/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(delete/3 ::
(rabbit_types:amqqueue(), 'false', 'false')
@@ -115,19 +116,16 @@
-spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()).
-spec(deliver/2 :: (pid(), rabbit_types:delivery()) -> boolean()).
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
--spec(ack/4 ::
- (pid(), rabbit_types:maybe(rabbit_types:txn()), [msg_id()], pid())
- -> 'ok').
+-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
--spec(commit_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> ok_or_errors()).
--spec(rollback_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> 'ok').
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
--spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
+-spec(limit_all/3 :: ([pid()], pid(), rabbit_limiter:token()) ->
+ ok_or_errors()).
-spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), qmsg()} | 'empty').
-spec(basic_consume/7 ::
- (rabbit_types:amqqueue(), boolean(), pid(), pid() | 'undefined',
- rabbit_types:ctag(), boolean(), any())
+ (rabbit_types:amqqueue(), boolean(), pid(),
+ rabbit_limiter:token(), rabbit_types:ctag(), boolean(), any())
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
@@ -145,14 +143,8 @@
-spec(run_backing_queue/3 ::
(pid(), atom(),
(fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
--spec(run_backing_queue_async/3 ::
- (pid(), atom(),
- (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
--spec(sync_timeout/1 :: (pid()) -> 'ok').
--spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
--spec(maybe_expire/1 :: (pid()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(pseudo_queue/2 :: (name(), pid()) -> rabbit_types:amqqueue()).
@@ -191,18 +183,21 @@ find_durable_queues() ->
end).
recover_durable_queues(DurableQueues) ->
- Qs = [start_queue_process(Q) || Q <- DurableQueues],
+ Qs = [start_queue_process(node(), Q) || Q <- DurableQueues],
[QName || Q = #amqqueue{name = QName, pid = Pid} <- Qs,
gen_server2:call(Pid, {init, true}, infinity) == {new, Q}].
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
- Q = start_queue_process(#amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
- exclusive_owner = Owner,
- pid = none}),
+ {Node, MNodes} = determine_queue_nodes(Args),
+ Q = start_queue_process(Node, #amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
+ exclusive_owner = Owner,
+ pid = none,
+ slave_pids = [],
+ mirror_nodes = MNodes}),
case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of
not_found -> rabbit_misc:not_found(QueueName);
Q1 -> Q1
@@ -233,15 +228,31 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
end).
store_queue(Q = #amqqueue{durable = true}) ->
- ok = mnesia:write(rabbit_durable_queue, Q, write),
+ ok = mnesia:write(rabbit_durable_queue, Q#amqqueue{slave_pids = []}, write),
ok = mnesia:write(rabbit_queue, Q, write),
ok;
store_queue(Q = #amqqueue{durable = false}) ->
ok = mnesia:write(rabbit_queue, Q, write),
ok.
-start_queue_process(Q) ->
- {ok, Pid} = rabbit_amqqueue_sup:start_child([Q]),
+determine_queue_nodes(Args) ->
+ Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>),
+ PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>),
+ case {Policy, PolicyParams} of
+ {{_Type, <<"nodes">>}, {array, Nodes}} ->
+ case [list_to_atom(binary_to_list(Node)) ||
+ {longstr, Node} <- Nodes] of
+ [Node] -> {Node, undefined};
+ [First | Rest] -> {First, Rest}
+ end;
+ {{_Type, <<"all">>}, _} ->
+ {node(), all};
+ _ ->
+ {node(), undefined}
+ end.
+
+start_queue_process(Node, Q) ->
+ {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]),
Q#amqqueue{pid = Pid}.
add_default_binding(#amqqueue{name = QueueName}) ->
@@ -257,8 +268,13 @@ lookup(Name) ->
with(Name, F, E) ->
case lookup(Name) of
- {ok, Q} -> rabbit_misc:with_exit_handler(E, fun () -> F(Q) end);
- {error, not_found} -> E()
+ {ok, Q = #amqqueue{slave_pids = []}} ->
+ rabbit_misc:with_exit_handler(E, fun () -> F(Q) end);
+ {ok, Q} ->
+ E1 = fun () -> timer:sleep(25), with(Name, F, E) end,
+ rabbit_misc:with_exit_handler(E1, fun () -> F(Q) end);
+ {error, not_found} ->
+ E()
end.
with(Name, F) ->
@@ -295,31 +311,61 @@ with_exclusive_access_or_die(Name, ReaderPid, F) ->
assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
RequiredArgs) ->
- rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName,
- [<<"x-expires">>, <<"x-message-ttl">>]).
+ rabbit_misc:assert_args_equivalence(
+ Args, RequiredArgs, QueueName,
+ [<<"x-expires">>, <<"x-message-ttl">>, <<"x-ha-policy">>]).
check_declare_arguments(QueueName, Args) ->
- [case Fun(rabbit_misc:table_lookup(Args, Key)) of
+ [case Fun(rabbit_misc:table_lookup(Args, Key), Args) of
ok -> ok;
{error, Error} -> rabbit_misc:protocol_error(
precondition_failed,
- "invalid arg '~s' for ~s: ~w",
+ "invalid arg '~s' for ~s: ~255p",
[Key, rabbit_misc:rs(QueueName), Error])
end || {Key, Fun} <-
- [{<<"x-expires">>, fun check_integer_argument/1},
- {<<"x-message-ttl">>, fun check_integer_argument/1}]],
+ [{<<"x-expires">>, fun check_integer_argument/2},
+ {<<"x-message-ttl">>, fun check_integer_argument/2},
+ {<<"x-ha-policy">>, fun check_ha_policy_argument/2}]],
ok.
-check_integer_argument(undefined) ->
+check_integer_argument(undefined, _Args) ->
ok;
-check_integer_argument({Type, Val}) when Val > 0 ->
+check_integer_argument({Type, Val}, _Args) when Val > 0 ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
true -> ok;
false -> {error, {unacceptable_type, Type}}
end;
-check_integer_argument({_Type, Val}) ->
+check_integer_argument({_Type, Val}, _Args) ->
{error, {value_zero_or_less, Val}}.
+check_ha_policy_argument(undefined, _Args) ->
+ ok;
+check_ha_policy_argument({longstr, <<"all">>}, _Args) ->
+ ok;
+check_ha_policy_argument({longstr, <<"nodes">>}, Args) ->
+ case rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>) of
+ undefined ->
+ {error, {require, 'x-ha-policy-params'}};
+ {array, []} ->
+ {error, {require_non_empty_list_of_nodes_for_ha}};
+ {array, Ary} ->
+ case lists:all(fun ({longstr, _Node}) -> true;
+ (_ ) -> false
+ end, Ary) of
+ true -> ok;
+ false -> {error, {require_node_list_as_longstrs_for_ha, Ary}}
+ end;
+ {Type, _} ->
+ {error, {ha_nodes_policy_params_not_array_of_longstr, Type}}
+ end;
+check_ha_policy_argument({longstr, Policy}, _Args) ->
+ {error, {invalid_ha_policy, Policy}};
+check_ha_policy_argument({Type, _}, _Args) ->
+ {error, {unacceptable_type, Type}}.
+
+list() ->
+ mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}).
+
list(VHostPath) ->
mnesia:dirty_match_object(
rabbit_queue,
@@ -342,6 +388,10 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
+force_event_refresh() ->
+ [gen_server2:cast(Q#amqqueue.pid, force_event_refresh) || Q <- list()],
+ ok.
+
consumers(#amqqueue{ pid = QPid }) ->
delegate_call(QPid, consumers).
@@ -360,9 +410,6 @@ consumers_all(VHostPath) ->
stat(#amqqueue{pid = QPid}) ->
delegate_call(QPid, stat).
-emit_stats(#amqqueue{pid = QPid}) ->
- delegate_cast(QPid, emit_stats).
-
delete_immediately(#amqqueue{ pid = QPid }) ->
gen_server2:cast(QPid, delete_immediately).
@@ -383,39 +430,28 @@ deliver(QPid, Delivery) ->
requeue(QPid, MsgIds, ChPid) ->
delegate_call(QPid, {requeue, MsgIds, ChPid}).
-ack(QPid, Txn, MsgIds, ChPid) ->
- delegate_cast(QPid, {ack, Txn, MsgIds, ChPid}).
+ack(QPid, MsgIds, ChPid) ->
+ delegate_cast(QPid, {ack, MsgIds, ChPid}).
reject(QPid, MsgIds, Requeue, ChPid) ->
delegate_cast(QPid, {reject, MsgIds, Requeue, ChPid}).
-commit_all(QPids, Txn, ChPid) ->
- safe_delegate_call_ok(
- fun (QPid) -> gen_server2:call(QPid, {commit, Txn, ChPid}, infinity) end,
- QPids).
-
-rollback_all(QPids, Txn, ChPid) ->
- delegate:invoke_no_result(
- QPids, fun (QPid) -> gen_server2:cast(QPid, {rollback, Txn, ChPid}) end).
-
notify_down_all(QPids, ChPid) ->
safe_delegate_call_ok(
fun (QPid) -> gen_server2:call(QPid, {notify_down, ChPid}, infinity) end,
QPids).
-limit_all(QPids, ChPid, LimiterPid) ->
+limit_all(QPids, ChPid, Limiter) ->
delegate:invoke_no_result(
- QPids, fun (QPid) ->
- gen_server2:cast(QPid, {limit, ChPid, LimiterPid})
- end).
+ QPids, fun (QPid) -> gen_server2:cast(QPid, {limit, ChPid, Limiter}) end).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
delegate_call(QPid, {basic_get, ChPid, NoAck}).
-basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid,
+basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter,
ConsumerTag, ExclusiveConsume, OkMsg) ->
delegate_call(QPid, {basic_consume, NoAck, ChPid,
- LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}).
+ Limiter, ConsumerTag, ExclusiveConsume, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
ok = delegate_call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
@@ -448,33 +484,19 @@ internal_delete(QueueName) ->
end).
run_backing_queue(QPid, Mod, Fun) ->
- gen_server2:call(QPid, {run_backing_queue, Mod, Fun}, infinity).
-
-run_backing_queue_async(QPid, Mod, Fun) ->
gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).
-sync_timeout(QPid) ->
- gen_server2:cast(QPid, sync_timeout).
-
-update_ram_duration(QPid) ->
- gen_server2:cast(QPid, update_ram_duration).
-
set_ram_duration_target(QPid, Duration) ->
gen_server2:cast(QPid, {set_ram_duration_target, Duration}).
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
-maybe_expire(QPid) ->
- gen_server2:cast(QPid, maybe_expire).
-
-drop_expired(QPid) ->
- gen_server2:cast(QPid, drop_expired).
-
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) ||
- #amqqueue{name = QueueName, pid = Pid}
+ #amqqueue{name = QueueName, pid = Pid,
+ slave_pids = []}
<- mnesia:table(rabbit_queue),
node(Pid) == Node])),
rabbit_binding:process_deletions(
@@ -487,11 +509,13 @@ delete_queue(QueueName) ->
rabbit_binding:remove_transient_for_destination(QueueName).
pseudo_queue(QueueName, Pid) ->
- #amqqueue{name = QueueName,
- durable = false,
- auto_delete = false,
- arguments = [],
- pid = Pid}.
+ #amqqueue{name = QueueName,
+ durable = false,
+ auto_delete = false,
+ arguments = [],
+ pid = Pid,
+ slave_pids = [],
+ mirror_nodes = undefined}.
safe_delegate_call_ok(F, Pids) ->
case delegate:invoke(Pids, fun (Pid) ->
View
8 src/rabbit_auth_backend.erl
@@ -36,17 +36,13 @@ behaviour_info(callbacks) ->
%% Client failed authentication. Log and die.
{check_user_login, 2},
- %% Given #user, vhost path and permission, can a user access a vhost?
- %% Permission is read - learn of the existence of (only relevant for
- %% management plugin)
- %% or write - log in
- %%
+ %% Given #user and vhost, can a user log in to a vhost?
%% Possible responses:
%% true
%% false
%% {error, Error}
%% Something went wrong. Log and die.
- {check_vhost_access, 3},
+ {check_vhost_access, 2},
%% Given #user, resource and permission, can a user access a resource?
%%
View
21 src/rabbit_backing_queue.erl
@@ -44,9 +44,7 @@ behaviour_info(callbacks) ->
%% makes it useful for passing messages back into the backing
%% queue, especially as the backing queue does not have
%% control of its own mailbox.
- %% 4. a synchronous callback. Same as the asynchronous callback
- %% but waits for completion and returns 'error' on error.
- {init, 4},
+ {init, 3},
%% Called on queue shutdown when queue isn't being deleted.
{terminate, 2},
@@ -107,21 +105,6 @@ behaviour_info(callbacks) ->
%% about. Must return 1 msg_id per Ack, in the same order as Acks.
{ack, 2},
- %% A publish, but in the context of a transaction.
- {tx_publish, 5},
-
- %% Acks, but in the context of a transaction.
- {tx_ack, 3},
-
- %% Undo anything which has been done in the context of the
- %% specified transaction.
- {tx_rollback, 2},
-
- %% Commit a transaction. The Fun passed in must be called once
- %% the messages have really been commited. This CPS permits the
- %% possibility of commit coalescing.
- {tx_commit, 4},
-
%% Reinsert messages into the queue which have already been
%% delivered and were pending acknowledgement.
{requeue, 3},
@@ -175,7 +158,7 @@ behaviour_info(callbacks) ->
%% the BQ to signal that it's already seen this message (and in
%% what capacity - i.e. was it published previously or discarded
%% previously) and thus the message should be dropped.
- {is_duplicate, 3},
+ {is_duplicate, 2},
%% Called to inform the BQ about messages which have reached the
%% queue, but are not going to be further passed to BQ for some
View
35 src/rabbit_basic.erl
@@ -18,8 +18,8 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/1, message/3, message/4, properties/1, delivery/5]).
--export([publish/4, publish/7]).
+-export([publish/1, message/3, message/4, properties/1, delivery/4]).
+-export([publish/4, publish/6]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -37,9 +37,8 @@
-spec(publish/1 ::
(rabbit_types:delivery()) -> publish_result()).
--spec(delivery/5 ::
- (boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
- rabbit_types:message(), undefined | integer()) ->
+-spec(delivery/4 ::
+ (boolean(), boolean(), rabbit_types:message(), undefined | integer()) ->
rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
@@ -53,10 +52,9 @@
-spec(publish/4 ::
(exchange_input(), rabbit_router:routing_key(), properties_input(),
body_input()) -> publish_result()).
--spec(publish/7 ::
+-spec(publish/6 ::
(exchange_input(), rabbit_router:routing_key(), boolean(), boolean(),
- rabbit_types:maybe(rabbit_types:txn()), properties_input(),
- body_input()) -> publish_result()).
+ properties_input(), body_input()) -> publish_result()).
-spec(build_content/2 :: (rabbit_framing:amqp_property_record(),
binary() | [binary()]) -> rabbit_types:content()).
-spec(from_content/1 :: (rabbit_types:content()) ->
@@ -73,9 +71,9 @@ publish(Delivery = #delivery{
Other -> Other
end.
-delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) ->
- #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn,
- sender = self(), message = Message, msg_seq_no = MsgSeqNo}.
+delivery(Mandatory, Immediate, Message, MsgSeqNo) ->
+ #delivery{mandatory = Mandatory, immediate = Immediate, sender = self(),
+ message = Message, msg_seq_no = MsgSeqNo}.
build_content(Properties, BodyBin) when is_binary(BodyBin) ->
build_content(Properties, [BodyBin]);
@@ -157,24 +155,23 @@ indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1).
%% Convenience function, for avoiding round-trips in calls across the
%% erlang distributed network.
publish(Exchange, RoutingKeyBin, Properties, Body) ->
- publish(Exchange, RoutingKeyBin, false, false, none, Properties,
- Body).
+ publish(Exchange, RoutingKeyBin, false, false, Properties, Body).
%% Convenience function, for avoiding round-trips in calls across the
%% erlang distributed network.
-publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Txn,
- Props, Body) ->
- publish(X, delivery(Mandatory, Immediate, Txn,
+publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Props, Body) ->
+ publish(X, delivery(Mandatory, Immediate,
message(XName, RKey, properties(Props), Body),
undefined));
-publish(XName, RKey, Mandatory, Immediate, Txn, Props, Body) ->
+publish(XName, RKey, Mandatory, Immediate, Props, Body) ->
case rabbit_exchange:lookup(XName) of
- {ok, X} -> publish(X, RKey, Mandatory, Immediate, Txn, Props, Body);
+ {ok, X} -> publish(X, RKey, Mandatory, Immediate, Props, Body);
Err -> Err
end.
publish(X, Delivery) ->
- {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery),
+ {RoutingRes, DeliveredQPids} =
+ rabbit_router:deliver(rabbit_exchange:route(X, Delivery), Delivery),
{ok, RoutingRes, DeliveredQPids}.
is_message_persistent(#content{properties = #'P_basic'{
View
474 src/rabbit_channel.erl
@@ -23,15 +23,18 @@
-export([start_link/10, do/2, do/3, flush/1, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2, confirm/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
--export([refresh_config_all/0, emit_stats/1, ready_for_close/1]).
+-export([refresh_config_local/0, ready_for_close/1]).
+-export([force_event_refresh/0]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
- prioritise_cast/2]).
+ prioritise_cast/2, prioritise_info/2, format_message_queue/2]).
+%% Internal
+-export([list_local/0]).
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
- limiter_pid, start_limiter_fun, transaction_id, tx_participants,
- next_tag, uncommitted_ack_q, unacked_message_q,
+ limiter, tx_status, next_tag,
+ unacked_message_q, uncommitted_message_q, uncommitted_ack_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, consumer_monitors, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
@@ -46,6 +49,7 @@
consumer_count,
messages_unacknowledged,
messages_unconfirmed,
+ messages_uncommitted,
acks_uncommitted,
prefetch_count,
client_flow_blocked]).
@@ -70,8 +74,7 @@
-spec(start_link/10 ::
(channel_number(), pid(), pid(), pid(), rabbit_types:protocol(),
rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(),
- pid(), fun ((non_neg_integer()) -> rabbit_types:ok(pid()))) ->
- rabbit_types:ok_pid_or_error()).
+ pid(), rabbit_limiter:token()) -> rabbit_types:ok_pid_or_error()).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
@@ -84,24 +87,25 @@
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
-spec(confirm/2 ::(pid(), [non_neg_integer()]) -> 'ok').
-spec(list/0 :: () -> [pid()]).
+-spec(list_local/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
-spec(info_all/0 :: () -> [rabbit_types:infos()]).
-spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]).
--spec(refresh_config_all/0 :: () -> 'ok').
--spec(emit_stats/1 :: (pid()) -> 'ok').
+-spec(refresh_config_local/0 :: () -> 'ok').
-spec(ready_for_close/1 :: (pid()) -> 'ok').
+-spec(force_event_refresh/0 :: () -> 'ok').
-endif.
%%----------------------------------------------------------------------------
start_link(Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
- Capabilities, CollectorPid, StartLimiterFun) ->
+ Capabilities, CollectorPid, Limiter) ->
gen_server2:start_link(
?MODULE, [Channel, ReaderPid, WriterPid, ConnPid, Protocol, User,
- VHost, Capabilities, CollectorPid, StartLimiterFun], []).
+ VHost, Capabilities, CollectorPid, Limiter], []).
do(Pid, Method) ->
do(Pid, Method, none).
@@ -128,6 +132,10 @@ confirm(Pid, MsgSeqNos) ->
gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}).
list() ->
+ rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(),
+ rabbit_channel, list_local, []).
+
+list_local() ->
pg_local:get_members(rabbit_channels).
info_keys() -> ?INFO_KEYS.
@@ -147,21 +155,22 @@ info_all() ->
info_all(Items) ->
rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()).
-refresh_config_all() ->
+refresh_config_local() ->
rabbit_misc:upmap(
- fun (C) -> gen_server2:call(C, refresh_config) end, list()),
+ fun (C) -> gen_server2:call(C, refresh_config) end, list_local()),
ok.
-emit_stats(Pid) ->
- gen_server2:cast(Pid, emit_stats).
-
ready_for_close(Pid) ->
gen_server2:cast(Pid, ready_for_close).
+force_event_refresh() ->
+ [gen_server2:cast(C, force_event_refresh) || C <- list()],
+ ok.
+
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
- Capabilities, CollectorPid, StartLimiterFun]) ->
+ Capabilities, CollectorPid, Limiter]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
StatsTimer = rabbit_event:init_stats_timer(),
@@ -171,13 +180,12 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
reader_pid = ReaderPid,
writer_pid = WriterPid,
conn_pid = ConnPid,
- limiter_pid = undefined,
- start_limiter_fun = StartLimiterFun,
- transaction_id = none,
- tx_participants = sets:new(),
+ limiter = Limiter,
+ tx_status = none,
next_tag = 1,
- uncommitted_ack_q = queue:new(),
unacked_message_q = queue:new(),
+ uncommitted_message_q = queue:new(),
+ uncommitted_ack_q = queue:new(),
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
@@ -195,7 +203,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
trace_state = rabbit_trace:init(VHost)},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
- fun() -> internal_emit_stats(State) end),
+ fun() -> emit_stats(State) end),
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -208,11 +216,16 @@ prioritise_call(Msg, _From, _State) ->
prioritise_cast(Msg, _State) ->
case Msg of
- emit_stats -> 7;
{confirm, _MsgSeqNos, _QPid} -> 5;
_ -> 0
end.
+prioritise_info(Msg, _State) ->
+ case Msg of
+ emit_stats -> 7;
+ _ -> 0
+ end.
+
handle_call(flush, _From, State) ->
reply(ok, State);
@@ -286,27 +299,30 @@ handle_cast({deliver, ConsumerTag, AckRequired,
exchange = ExchangeName#resource.name,
routing_key = RoutingKey},
rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content),
-
- maybe_incr_stats([{QPid, 1}],
- case AckRequired of
- true -> deliver;
- false -> deliver_no_ack
- end, State),
+ maybe_incr_stats([{QPid, 1}], case AckRequired of
+ true -> deliver;
+ false -> deliver_no_ack
+ end, State),
+ maybe_incr_redeliver_stats(Redelivered, QPid, State),
rabbit_trace:tap_trace_out(Msg, TraceState),
noreply(State1#ch{next_tag = DeliveryTag + 1});
-handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
- internal_emit_stats(State),
- noreply([ensure_stats_timer],
- State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)});
+handle_cast(force_event_refresh, State) ->
+ rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
+ noreply(State);
handle_cast({confirm, MsgSeqNos, From}, State) ->
State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end).
handle_info(timeout, State) ->
noreply(State);
+handle_info(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
+ emit_stats(State),
+ noreply([ensure_stats_timer],
+ State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)});
+
handle_info({'DOWN', MRef, process, QPid, Reason},
State = #ch{consumer_monitors = ConsumerMonitors}) ->
noreply(
@@ -322,16 +338,13 @@ handle_info({'EXIT', _Pid, Reason}, State) ->
handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) ->
ok = clear_permission_cache(),
- rabbit_event:if_enabled(StatsTimer,
- fun () ->
- internal_emit_stats(
- State, [{idle_since, now()}])
- end),
+ rabbit_event:if_enabled(
+ StatsTimer, fun () -> emit_stats(State, [{idle_since, now()}]) end),
StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer),
{hibernate, State#ch{stats_timer = StatsTimer1}}.
terminate(Reason, State) ->
- {Res, _State1} = rollback_and_notify(State),
+ {Res, _State1} = notify_queues(State),
case Reason of
normal -> ok = Res;
shutdown -> ok = Res;
@@ -344,6 +357,8 @@ terminate(Reason, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
+
%%---------------------------------------------------------------------------
reply(Reply, NewState) -> reply(Reply, [], NewState).
@@ -368,8 +383,7 @@ next_state(Mask, State) ->
ensure_stats_timer(State = #ch{stats_timer = StatsTimer}) ->
ChPid = self(),
State#ch{stats_timer = rabbit_event:ensure_stats_timer(
- StatsTimer,
- fun() -> emit_stats(ChPid) end)}.
+ StatsTimer, ChPid, emit_stats)}.
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
@@ -386,8 +400,8 @@ send_exception(Reason, State = #ch{protocol = Protocol,
rabbit_binary_generator:map_exception(Channel, Reason, Protocol),
rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
[ConnPid, Channel, Reason]),
- %% something bad's happened: rollback_and_notify may not be 'ok'
- {_Result, State1} = rollback_and_notify(State),
+ %% something bad's happened: notify_queues may not be 'ok'
+ {_Result, State1} = notify_queues(State),
case CloseChannel of
Channel -> ok = rabbit_writer:send_command(WriterPid, CloseMethod),
{noreply, State1};
@@ -538,17 +552,13 @@ process_confirms(MsgSeqNos, QPid, Nack, State = #ch{unconfirmed_mq = UMQ,
fun(MsgSeqNo, {_MXs, UMQ0, _UQM} = Acc) ->
case gb_trees:lookup(MsgSeqNo, UMQ0) of
{value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ,
- Acc, Nack, State);
+ Acc, Nack);
none -> Acc
end
end, {[], UMQ, UQM}, MsgSeqNos),
{MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}.
-remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack,
- State) ->
- %% these confirms will be emitted even when a queue dies, but that
- %% should be fine, since the queue stats get erased immediately
- maybe_incr_stats([{{QPid, XName}, 1}], confirm, State),
+remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack) ->
UQM1 = case gb_trees:lookup(QPid, UQM) of
{value, MsgSeqNos} ->
MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
@@ -589,10 +599,19 @@ handle_method(_Method, _, State = #ch{state = closing}) ->
{noreply, State};
handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) ->
- {ok, State1} = rollback_and_notify(State),
+ {ok, State1} = notify_queues(State),
ReaderPid ! {channel_closing, self()},
{noreply, State1};
+%% Even though the spec prohibits the client from sending commands
+%% while waiting for the reply to a synchronous command, we generally
+%% do allow this...except in the case of a pending tx.commit, where
+%% it could wreak havoc.
+handle_method(_Method, _, #ch{tx_status = TxStatus})
+ when TxStatus =/= none andalso TxStatus =/= in_progress ->
+ rabbit_misc:protocol_error(
+ channel_error, "unexpected command while processing 'tx.commit'", []);
+
handle_method(#'access.request'{},_, State) ->
{reply, #'access.request_ok'{ticket = 1}, State};
@@ -601,7 +620,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
mandatory = Mandatory,
immediate = Immediate},
Content, State = #ch{virtual_host = VHostPath,
- transaction_id = TxnKey,
+ tx_status = TxStatus,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
@@ -613,29 +632,24 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
check_user_id_header(DecodedContent#content.properties, State),
{MsgSeqNo, State1} =
- case ConfirmEnabled of
- false -> {undefined, State};
- true -> SeqNo = State#ch.publish_seqno,
- {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
+ case {TxStatus, ConfirmEnabled} of
+ {none, false} -> {undefined, State};
+ {_, _} -> SeqNo = State#ch.publish_seqno,
+ {SeqNo, State#ch{publish_seqno = SeqNo + 1}}
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
rabbit_trace:tap_trace_in(Message, TraceState),
- {RoutingRes, DeliveredQPids} =
- rabbit_exchange:publish(
- Exchange,
- rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message,
- MsgSeqNo)),
- State2 = process_routing_result(RoutingRes, DeliveredQPids,
- ExchangeName, MsgSeqNo, Message,
- State1),
- maybe_incr_stats([{ExchangeName, 1} |
- [{{QPid, ExchangeName}, 1} ||
- QPid <- DeliveredQPids]], publish, State2),
- {noreply, case TxnKey of
- none -> State2;
- _ -> add_tx_participants(DeliveredQPids, State2)
- end};
+ Delivery = rabbit_basic:delivery(Mandatory, Immediate, Message,
+ MsgSeqNo),
+ QNames = rabbit_exchange:route(Exchange, Delivery),
+ {noreply,
+ case TxStatus of
+ none -> deliver_to_queues({Delivery, QNames}, State1);
+ in_progress -> TMQ = State1#ch.uncommitted_message_q,
+ NewTMQ = queue:in({Delivery, QNames}, TMQ),
+ State1#ch{uncommitted_message_q = NewTMQ}
+ end};
{error, Reason} ->
rabbit_misc:protocol_error(precondition_failed,
"invalid message: ~p", [Reason])
@@ -649,22 +663,16 @@ handle_method(#'basic.nack'{delivery_tag = DeliveryTag,
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
- _, State = #ch{transaction_id = TxnKey,
- unacked_message_q = UAMQ}) ->
+ _, State = #ch{unacked_message_q = UAMQ,
+ tx_status = TxStatus}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
- QIncs = ack(TxnKey, Acked),
- Participants = [QPid || {QPid, _} <- QIncs],
- maybe_incr_stats(QIncs, ack, State),
- {noreply, case TxnKey of
- none -> ok = notify_limiter(State#ch.limiter_pid, Acked),
- State#ch{unacked_message_q = Remaining};
- _ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q,
- Acked),
- add_tx_participants(
- Participants,
- State#ch{unacked_message_q = Remaining,
- uncommitted_ack_q = NewUAQ})
- end};
+ State1 = State#ch{unacked_message_q = Remaining},
+ {noreply,
+ case TxStatus of
+ none -> ack(Acked, State1);
+ in_progress -> NewTAQ = queue:join(State1#ch.uncommitted_ack_q, Acked),
+ State1#ch{uncommitted_ack_q = NewTAQ}
+ end};
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
@@ -685,11 +693,11 @@ handle_method(#'basic.get'{queue = QueueNameBin,
State1 = lock_message(not(NoAck),
ack_record(DeliveryTag, none, Msg),
State),
- maybe_incr_stats([{QPid, 1}],
- case NoAck of
- true -> get_no_ack;
- false -> get
- end, State),
+ maybe_incr_stats([{QPid, 1}], case NoAck of
+ true -> get_no_ack;
+ false -> get
+ end, State),
+ maybe_incr_redeliver_stats(Redelivered, QPid, State),
rabbit_trace:tap_trace_out(Msg, TraceState),
ok = rabbit_writer:send_command(
WriterPid,
@@ -711,7 +719,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
exclusive = ExclusiveConsume,
nowait = NoWait},
_, State = #ch{conn_pid = ConnPid,
- limiter_pid = LimiterPid,
+ limiter = Limiter,
consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
@@ -730,7 +738,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
QueueName, ConnPid,
fun (Q) ->
{rabbit_amqqueue:basic_consume(
- Q, NoAck, self(), LimiterPid,
+ Q, NoAck, self(), Limiter,
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag})),
@@ -804,22 +812,23 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
rabbit_misc:protocol_error(not_implemented,
"prefetch_size!=0 (~w)", [Size]);
-handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
- _, State = #ch{limiter_pid = LimiterPid}) ->
- LimiterPid1 = case {LimiterPid, PrefetchCount} of
- {undefined, 0} -> undefined;
- {undefined, _} -> start_limiter(State);
- {_, _} -> LimiterPid
- end,
- LimiterPid2 = case rabbit_limiter:limit(LimiterPid1, PrefetchCount) of
- ok -> LimiterPid1;
- stopped -> unlimit_queues(State)
- end,
- {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}};
+handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _,
+ State = #ch{limiter = Limiter}) ->
+ Limiter1 = case {rabbit_limiter:is_enabled(Limiter), PrefetchCount} of
+ {false, 0} -> Limiter;
+ {false, _} -> enable_limiter(State);
+ {_, _} -> Limiter
+ end,
+ Limiter3 = case rabbit_limiter:limit(Limiter1, PrefetchCount) of
+ ok -> Limiter1;
+ {disabled, Limiter2} -> ok = limit_queues(Limiter2, State),
+ Limiter2
+ end,
+ {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter3}};
handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{unacked_message_q = UAMQ,
- limiter_pid = LimiterPid}) ->
+ limiter = Limiter}) ->
OkFun = fun () -> ok end,
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
@@ -833,7 +842,7 @@ handle_method(#'basic.recover_async'{requeue = true},
QPid, lists:reverse(MsgIds), self())
end)
end, ok, UAMQ),
- ok = notify_limiter(LimiterPid, UAMQ),
+ ok = notify_limiter(Limiter, UAMQ),
%% No answer required - basic.recover is the newer, synchronous
%% variant of this method
{noreply, State#ch{unacked_message_q = queue:new()}};
@@ -894,7 +903,6 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
nowait = NoWait},
_, State = #ch{virtual_host = VHostPath}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
- check_configure_permitted(ExchangeName, State),
check_not_default_exchange(ExchangeName),
_ = rabbit_exchange:lookup_or_die(ExchangeName),
return_ok(State, NoWait, #'exchange.declare_ok'{});
@@ -990,7 +998,6 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
_, State = #ch{virtual_host = VHostPath,
conn_pid = ConnPid}) ->
QueueName = rabbit_misc:r(VHostPath, queue, QueueNameBin),
- check_configure_permitted(QueueName, State),
{{ok, MessageCount, ConsumerCount}, #amqqueue{} = Q} =
rabbit_amqqueue:with_or_die(
QueueName, fun (Q) -> {rabbit_amqqueue:stat(Q), Q} end),
@@ -1047,33 +1054,33 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
-
handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) ->
rabbit_misc:protocol_error(
precondition_failed, "cannot switch from confirm to tx mode", []);
-handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none}) ->
- {reply, #'tx.select_ok'{}, new_tx(State)};
-
handle_method(#'tx.select'{}, _, State) ->
- {reply, #'tx.select_ok'{}, State};
+ {reply, #'tx.select_ok'{}, State#ch{tx_status = in_progress}};
-handle_method(#'tx.commit'{}, _, #ch{transaction_id = none}) ->
+handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel is not transactional", []);
-handle_method(#'tx.commit'{}, _, State) ->
- {reply, #'tx.commit_ok'{}, internal_commit(State)};
+handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
+ uncommitted_ack_q = TAQ}) ->
+ State1 = new_tx(ack(TAQ, rabbit_misc:queue_fold(fun deliver_to_queues/2,
+ State, TMQ))),
+ {noreply, maybe_complete_tx(State1#ch{tx_status = committing})};
-handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) ->
+handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel is not transactional", []);
-handle_method(#'tx.rollback'{}, _, State) ->
- {reply, #'tx.rollback_ok'{}, internal_rollback(State)};
+handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ,
+ uncommitted_ack_q = TAQ}) ->
+ {reply, #'tx.rollback_ok'{}, new_tx(State#ch{unacked_message_q =
+ queue:join(TAQ, UAMQ)})};
-handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId})
- when TxId =/= none ->
+handle_method(#'confirm.select'{}, _, #ch{tx_status = in_progress}) ->
rabbit_misc:protocol_error(
precondition_failed, "cannot switch from tx to confirm mode", []);
@@ -1082,23 +1089,23 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
NoWait, #'confirm.select_ok'{});
handle_method(#'channel.flow'{active = true}, _,
- State = #ch{limiter_pid = LimiterPid}) ->
- LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of
- ok -> LimiterPid;
- stopped -> unlimit_queues(State)
- end,
- {reply, #'channel.flow_ok'{active = true},
- State#ch{limiter_pid = LimiterPid1}};
+ State = #ch{limiter = Limiter}) ->
+ Limiter2 = case rabbit_limiter:unblock(Limiter) of
+ ok -> Limiter;
+ {disabled, Limiter1} -> ok = limit_queues(Limiter1, State),
+ Limiter1
+ end,
+ {reply, #'channel.flow_ok'{active = true}, State#ch{limiter = Limiter2}};
handle_method(#'channel.flow'{active = false}, _,
- State = #ch{limiter_pid = LimiterPid,
- consumer_mapping = Consumers}) ->
- LimiterPid1 = case LimiterPid of
- undefined -> start_limiter(State);
- Other -> Other
- end,
- State1 = State#ch{limiter_pid = LimiterPid1},
- ok = rabbit_limiter:block(LimiterPid1),
+ State = #ch{consumer_mapping = Consumers,
+ limiter = Limiter}) ->
+ Limiter1 = case rabbit_limiter:is_enabled(Limiter) of
+ true -> Limiter;
+ false -> enable_limiter(State)
+ end,
+ State1 = State#ch{limiter = Limiter1},
+ ok = rabbit_limiter:block(Limiter1),
case consumer_queues(Consumers) of
[] -> {reply, #'channel.flow_ok'{active = false}, State1};
QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} ||
@@ -1139,10 +1146,16 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
%% process_confirms to prevent each MsgSeqNo being removed from
%% the set one by one which which would be inefficient
State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
- {Nack, SendFun} = case Reason of
- normal -> {false, fun record_confirms/2};
- _ -> {true, fun send_nacks/2}
- end,
+ {Nack, SendFun} =
+ case Reason of
+ Reason when Reason =:= noproc; Reason =:= noconnection;
+ Reason =:= normal; Reason =:= shutdown ->
+ {false, fun record_confirms/2};
+ {shutdown, _} ->
+ {false, fun record_confirms/2};
+ _ ->
+ {true, fun send_nacks/2}
+ end,
{MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1),
erase_queue_stats(QPid),
State3 = SendFun(MXs, State2),
@@ -1222,7 +1235,7 @@ reject(DeliveryTag, Requeue, Multiple, State = #ch{unacked_message_q = UAMQ}) ->
fun (QPid, MsgIds, ok) ->
rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
end, ok, Acked),
- ok = notify_limiter(State#ch.limiter_pid, Acked),
+ ok = notify_limiter(State#ch.limiter, Acked),
{noreply, State#ch{unacked_message_q = Remaining}}.
ack_record(DeliveryTag, ConsumerTag,
@@ -1252,55 +1265,24 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
precondition_failed, "unknown delivery tag ~w", [DeliveryTag])
end.
-add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) ->
- State#ch{tx_participants = sets:union(Participants,
- sets:from_list(MoreP))}.
-
-ack(TxnKey, UAQ) ->
- fold_per_queue(
- fun (QPid, MsgIds, L) ->
- ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()),
- [{QPid, length(MsgIds)} | L]
- end, [], UAQ).
-
-make_tx_id() -> rabbit_guid:guid().
-
-new_tx(State) ->
- State#ch{transaction_id = make_tx_id(),
- tx_participants = sets:new(),
- uncommitted_ack_q = queue:new()}.
-
-internal_commit(State = #ch{transaction_id = TxnKey,
- tx_participants = Participants}) ->
- case rabbit_amqqueue:commit_all(sets:to_list(Participants),
- TxnKey, self()) of
- ok -> ok = notify_limiter(State#ch.limiter_pid,
- State#ch.uncommitted_ack_q),
- new_tx(State);
- {error, Errors} -> rabbit_misc:protocol_error(
- internal_error, "commit failed: ~w", [Errors])
- end.
+ack(Acked, State) ->
+ QIncs = fold_per_queue(
+ fun (QPid, MsgIds, L) ->
+ ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
+ [{QPid, length(MsgIds)} | L]
+ end, [], Acked),
+ maybe_incr_stats(QIncs, ack, State),
+ ok = notify_limiter(State#ch.limiter, Acked),
+ State.
+
+new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
+ uncommitted_ack_q = queue:new()}.
-internal_rollback(State = #ch{transaction_id = TxnKey,
- tx_participants = Participants,
- uncommitted_ack_q = UAQ,
- unacked_message_q = UAMQ}) ->
- ?LOGDEBUG("rollback ~p~n - ~p acks uncommitted, ~p messages unacked~n",
- [self(),
- queue:len(UAQ),
- queue:len(UAMQ)]),
- ok = rabbit_amqqueue:rollback_all(sets:to_list(Participants),
- TxnKey, self()),
- NewUAMQ = queue:join(UAQ, UAMQ),
- new_tx(State#ch{unacked_message_q = NewUAMQ}).
-
-rollback_and_notify(State = #ch{state = closing}) ->
+notify_queues(State = #ch{state = closing}) ->
{ok, State};
-rollback_and_notify(State = #ch{transaction_id = none}) ->
- {notify_queues(State), State#ch{state = closing}};
-rollback_and_notify(State) ->
- State1 = internal_rollback(State),
- {notify_queues(State1), State1#ch{state = closing}}.
+notify_queues(State = #ch{consumer_mapping = Consumers}) ->
+ {rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()),
+ State#ch{state = closing}}.
fold_per_queue(F, Acc0, UAQ) ->
D = rabbit_misc:queue_fold(
@@ -1314,20 +1296,14 @@ fold_per_queue(F, Acc0, UAQ) ->
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
-start_limiter(State = #ch{unacked_message_q = UAMQ, start_limiter_fun = SLF}) ->
- {ok, LPid} = SLF(queue:len(UAMQ)),
- ok = limit_queues(LPid, State),
- LPid.
+enable_limiter(State = #ch{unacked_message_q = UAMQ,
+ limiter = Limiter}) ->
+ Limiter1 = rabbit_limiter:enable(Limiter, queue:len(UAMQ)),
+ ok = limit_queues(Limiter1, State),
+ Limiter1.
-notify_queues(#ch{consumer_mapping = Consumers}) ->
- rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()).
-
-unlimit_queues(State) ->
- ok = limit_queues(undefined, State),
- undefined.
-
-limit_queues(LPid, #ch{consumer_mapping = Consumers}) ->
- rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid).
+limit_queues(Limiter, #ch{consumer_mapping = Consumers}) ->
+ rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), Limiter).
consumer_queues(Consumers) ->
lists:usort([QPid ||
@@ -1338,25 +1314,37 @@ consumer_queues(Consumers) ->
%% for messages delivered to subscribed consumers, but not acks for
%% messages sent in a response to a basic.get (identified by their
%% 'none' consumer tag)
-notify_limiter(undefined, _Acked) ->
- ok;
-notify_limiter(LimiterPid, Acked) ->
- case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc;
- ({_, _, _}, Acc) -> Acc + 1
- end, 0, Acked) of
- 0 -> ok;
- Count -> rabbit_limiter:ack(LimiterPid, Count)
+notify_limiter(Limiter, Acked) ->
+ case rabbit_limiter:is_enabled(Limiter) of
+ false -> ok;
+ true -> case rabbit_misc:queue_fold(fun ({_, none, _}, Acc) -> Acc;
+ ({_, _, _}, Acc) -> Acc + 1
+ end, 0, Acked) of
+ 0 -> ok;
+ Count -> rabbit_limiter:ack(Limiter, Count)
+ end
end.
+deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
+ exchange_name = XName},
+ msg_seq_no = MsgSeqNo},
+ QNames}, State) ->
+ {RoutingRes, DeliveredQPids} = rabbit_router:deliver(QNames, Delivery),
+ State1 = process_routing_result(RoutingRes, DeliveredQPids,
+ XName, MsgSeqNo, Message, State),
+ maybe_incr_stats([{XName, 1} |
+ [{{QPid, XName}, 1} ||
+ QPid <- DeliveredQPids]], publish, State1),
+ State1.
+
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
return_unroutable, State),
record_confirm(MsgSeqNo, XName, State);
process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_consumers),
- maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
- return_not_delivered, State),
+ maybe_incr_stats([{XName, 1}], return_not_delivered, State),
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
@@ -1386,20 +1374,25 @@ lock_message(false, _MsgStruct, State) ->
send_nacks([], State) ->
State;
-send_nacks(MXs, State) ->
+send_nacks(MXs, State = #ch{tx_status = none}) ->
MsgSeqNos = [ MsgSeqNo || {MsgSeqNo, _} <- MXs ],
coalesce_and_send(MsgSeqNos,
fun(MsgSeqNo, Multiple) ->
#'basic.nack'{delivery_tag = MsgSeqNo,
multiple = Multiple}
- end, State).
+ end, State);
+send_nacks(_, State) ->
+ maybe_complete_tx(State#ch{tx_status = failed}).
-send_confirms(State = #ch{confirmed = C}) ->
+send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
C1 = lists:append(C),
MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], confirm, State),
MsgSeqNo
end || {MsgSeqNo, ExchangeName} <- C1 ],
- send_confirms(MsgSeqNos, State #ch{confirmed = []}).
+ send_confirms(MsgSeqNos, State #ch{confirmed = []});
+send_confirms(State) ->
+ maybe_complete_tx(State).
+
send_confirms([], State) ->
State;
send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) ->
@@ -1429,31 +1422,56 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss],
State.
+maybe_complete_tx(State = #ch{tx_status = in_progress}) ->
+ State;
+maybe_complete_tx(State = #ch{unconfirmed_mq = UMQ}) ->
+ case gb_trees:is_empty(UMQ) of
+ false -> State;
+ true -> complete_tx(State#ch{confirmed = []})
+ end.
+
+complete_tx(State = #ch{tx_status = committing}) ->
+ ok = rabbit_writer:send_command(State#ch.writer_pid, #'tx.commit_ok'{}),
+ State#ch{tx_status = in_progress};
+complete_tx(State = #ch{tx_status = failed}) ->
+ {noreply, State1} = send_exception(
+ rabbit_misc:amqp_error(
+ precondition_failed, "partial tx completion", [],
+ 'tx.commit'),
+ State),
+ State1#ch{tx_status = in_progress}.
+
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, _) -> self();
i(connection, #ch{conn_pid = ConnPid}) -> ConnPid;
i(number, #ch{channel = Channel}) -> Channel;
i(user, #ch{user = User}) -> User#user.username;
i(vhost, #ch{virtual_host = VHost}) -> VHost;
-i(transactional, #ch{transaction_id = TxnKey}) -> TxnKey =/= none;
+i(transactional, #ch{tx_status = TE}) -> TE =/= none;
i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) ->
gb_trees:size(UMQ);
-i(messages_unacknowledged, #ch{unacked_message_q = UAMQ,
- uncommitted_ack_q = UAQ}) ->
- queue:len(UAMQ) + queue:len(UAQ);
-i(acks_uncommitted, #ch{uncommitted_ack_q = UAQ}) ->
- queue:len(UAQ);
-i(prefetch_count, #ch{limiter_pid = LimiterPid}) ->
- rabbit_limiter:get_limit(LimiterPid);
-i(client_flow_blocked, #ch{limiter_pid = LimiterPid}) ->
- rabbit_limiter:is_blocked(LimiterPid);
+i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) ->
+ queue:len(UAMQ);
+i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) ->
+ queue:len(TMQ);
+i(acks_uncommitted, #ch{uncommitted_ack_q = TAQ}) ->
+ queue:len(TAQ);
+i(prefetch_count, #ch{limiter = Limiter}) ->
+ rabbit_limiter:get_limit(Limiter);
+i(client_flow_blocked, #ch{limiter = Limiter}) ->
+ rabbit_limiter:is_blocked(Limiter);
i(Item, _) ->
throw({bad_argument, Item}).
+maybe_incr_redeliver_stats(true, QPid, State) ->
+ maybe_incr_stats([{QPid, 1}], redeliver, State);
+maybe_incr_redeliver_stats(_, _, _) ->
+ ok.
+
maybe_incr_stats(QXIncs, Measure, #ch{stats_timer = StatsTimer}) ->
case rabbit_event:stats_level(StatsTimer) of
fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs];
@@ -1488,10 +1506,10 @@ update_measures(Type, QX, Inc, Measure) ->
put({Type, QX},
orddict:store(Measure, Cur + Inc, Measures)).
-internal_emit_stats(State) ->
- internal_emit_stats(State, []).
+emit_stats(State) ->
+ emit_stats(State, []).
-internal_emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) ->
+emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) ->
CoarseStats = infos(?STATISTICS_KEYS, State),
case rabbit_event:stats_level(StatsTimer) of
coarse ->
View
2 src/rabbit_common.app.src
@@ -1,6 +1,6 @@
{application, rabbit_common,
[{description, "RabbitMQ Common Libraries"},
- {vsn, "2.5.0"},
+ {vsn, "2.6.0"},
{modules, []},
{registered, []},
{env, []},
View
108 src/rabbit_framing_amqp_0_8.erl
@@ -136,60 +136,60 @@
| #'test.string_ok'{} | #'test.table'{} | #'test.table_ok'{} | #'test.content'{}
| #'test.content_ok'{} )).
-type(amqp_method_field_name() ::
- ( heartbeat | ticket | exchange | integer_4
- | nowait | nowait | response | consumer_tag
- | prefetch_size | multiple | queue | version_minor
- | mechanisms | exchange | reply_code | routing_key
- | string_op | ticket | routing_key | mandatory
- | integer_op | consumer_tag | arguments | integer_1
- | requeue | consumer_tag | host | locale
- | operation | content_size | ticket | durable
- | no_ack | identifier | result | reply_text
- | immediate | queue | consumer_tag | multiple
- | nowait | message_count | ticket | exclusive
- | exchange | string_result | reply_text | server_properties
- | routing_key | active | reply_code | no_local
- | exchange | requeue | queue | delivery_tag
- | prefetch_count | virtual_host | if_empty | consumer_tag
- | routing_key | queue | message_count | exclusive
- | channel_max | string_2 | consumer_tag | immediate
- | consumer_tag | integer_2 | content_checksum | passive
- | no_ack | consumer_tag | exchange | class_id
- | prefetch_size | routing_key | exchange | realm
- | routing_key | ticket | meta_data | queue
- | exchange | passive | queue | reply_code
- | queue | capabilities | result | ticket
- | routing_key | internal | mechanism | exchange
- | requeue | channel_max | dtx_identifier | global
- | ticket | ticket | reply_text | locales
- | redelivered | consume_rate | method_id | type
- | consumer_tag | nowait | reply_code | mandatory
- | out_of_band | if_unused | operation | nowait
- | durable | delivery_tag | no_ack | insist
- | consumer_tag | nowait | prefetch_count | string_1
- | mandatory | queue | exchange | requeue
- | routing_key | consumer_tag | routing_key | ticket
- | heartbeat | prefetch_size | reply_text | consumer_tag
- | queue | ticket | reply_text | message_count
- | routing_key | identifier | consumer_count | frame_max
- | exchange | consumer_tag | exclusive | exchange
- | global | message_count | reply_code | arguments
- | active | cluster_id | delivery_tag | exclusive
- | immediate | response | ticket | exchange
- | consumer_tag | client_properties | redelivered | nowait
- | arguments | integer_result | exclusive | details
- | known_hosts | global | staged_size | no_local
- | challenge | no_local | class_id | delivery_tag
- | ticket | known_hosts | arguments | ticket
- | delivery_tag | frame_max | exchange | exchange
- | consumer_tag | queue | routing_key | table
- | delivery_tag | passive | integer_3 | queue
- | if_unused | active | method_id | version_major
- | ticket | auto_delete | reply_text | nowait
- | identifier | nowait | read | nowait
- | ticket | nowait | auto_delete | delivery_tag
- | nowait | prefetch_count | redelivered | write
- | reply_code | delivery_tag )).
+ ( frame_max | no_ack | redelivered | requeue
+ | operation | exclusive | known_hosts | global
+ | staged_size | message_count | immediate | mechanism
+ | delivery_tag | ticket | version_major | server_properties
+ | nowait | arguments | ticket | identifier
+ | exchange | integer_op | active | routing_key
+ | routing_key | table | nowait | delivery_tag
+ | queue | prefetch_count | ticket | integer_4
+ | consumer_tag | identifier | heartbeat | ticket
+ | auto_delete | passive | channel_max | nowait
+ | read | reply_code | mandatory | exchange
+ | delivery_tag | response | no_ack | realm
+ | redelivered | integer_result | reply_code | version_minor
+ | exchange | ticket | nowait | operation
+ | arguments | queue | host | consumer_tag
+ | prefetch_size | multiple | ticket | if_unused
+ | result | channel_max | exchange | global
+ | routing_key | routing_key | nowait | no_ack
+ | exchange | mandatory | locale | string_result
+ | integer_1 | requeue | exclusive | type
+ | ticket | reply_text | content_size | routing_key
+ | active | if_unused | exchange | identifier
+ | global | no_local | queue | queue
+ | consumer_tag | ticket | virtual_host | exchange
+ | queue | client_properties | reply_text | multiple
+ | consumer_tag | internal | passive | delivery_tag
+ | prefetch_count | class_id | message_count | queue
+ | consumer_tag | consumer_tag | exchange | message_count
+ | challenge | queue | exchange | string_2
+ | consumer_tag | passive | immediate | cluster_id
+ | consumer_tag | integer_2 | queue | capabilities
+ | exclusive | no_local | dtx_identifier | heartbeat
+ | prefetch_size | ticket | routing_key | ticket
+ | routing_key | ticket | delivery_tag | exchange
+ | locales | details | consumer_tag | reply_code
+ | queue | method_id | result | reply_code
+ | method_id | exchange | out_of_band | requeue
+ | consumer_tag | reply_text | reply_code | exchange
+ | consume_rate | nowait | routing_key | nowait
+ | durable | requeue | exclusive | nowait
+ | integer_3 | mechanisms | exchange | active
+ | redelivered | ticket | delivery_tag | queue
+ | ticket | reply_text | reply_text | nowait
+ | prefetch_count | consumer_count | class_id | consumer_tag
+ | mandatory | reply_text | string_1 | message_count
+ | ticket | content_checksum | prefetch_size | arguments
+ | if_empty | consumer_tag | string_op | delivery_tag
+ | ticket | nowait | meta_data | routing_key
+ | response | frame_max | queue | insist
+ | routing_key | nowait | durable | reply_code
+ | arguments | routing_key | write | exclusive
+ | immediate | exchange | no_local | auto_delete
+ | nowait | consumer_tag | consumer_tag | known_hosts
+ | consumer_tag | delivery_tag )).
-type(amqp_property_record() ::
( #'P_connection'{} | #'P_channel'{} | #'P_access'{} | #'P_exchange'{}
| #'P_queue'{} | #'P_basic'{} | #'P_file'{} | #'P_stream'{}
View
74 src/rabbit_framing_amqp_0_9_1.erl
@@ -115,43 +115,43 @@
| #'tx.commit'{} | #'tx.commit_ok'{} | #'tx.rollback'{} | #'tx.rollback_ok'{}
| #'confirm.select'{} | #'confirm.select_ok'{} )).
-type(amqp_method_field_name() ::
- ( response | ticket | response | known_hosts
- | message_count | realm | nowait | delivery_tag
- | reply_code | exclusive | delivery_tag | cluster_id
- | delivery_tag | reply_text | arguments | destination
- | source | frame_max | queue | nowait
- | locale | immediate | routing_key | queue
- | mechanism | routing_key | routing_key | exclusive
- | message_count | reply_text | internal | channel_id
- | arguments | no_ack | destination | ticket
- | capabilities | source | heartbeat | arguments
- | auto_delete | channel_max | multiple | mechanisms
- | client_properties | nowait | passive | consumer_tag
- | class_id | insist | queue | exchange
- | consumer_tag | method_id | passive | prefetch_count
- | no_local | routing_key | ticket | if_unused
- | class_id | redelivered | challenge | nowait
- | routing_key | active | nowait | ticket
- | auto_delete | reply_code | active | read
- | routing_key | consumer_tag | requeue | nowait
- | ticket | passive | nowait | queue
- | ticket | ticket | requeue | ticket
- | exchange | global | mandatory | reply_text
- | write | no_ack | queue | nowait
- | locales | delivery_tag | prefetch_size | nowait
- | virtual_host | arguments | frame_max | ticket
- | if_unused | queue | nowait | exchange
- | ticket | arguments | routing_key | redelivered
- | delivery_tag | version_major | ticket | arguments
- | requeue | active | exchange | exchange
- | arguments | durable | exchange | consumer_count
- | heartbeat | exclusive | consumer_tag | routing_key
- | durable | multiple | type | requeue
- | message_count | version_minor | consumer_tag | method_id
- | if_empty | queue | exchange | ticket
- | out_of_band | server_properties | queue | exchange
- | reply_code | ticket | message_count | nowait
- | channel_max )).
+ ( version_minor | ticket | queue | consumer_tag
+ | ticket | reply_text | queue | routing_key
+ | reply_code | redelivered | delivery_tag | delivery_tag
+ | type | ticket | virtual_host | message_count
+ | frame_max | nowait | locale | reply_text
+ | immediate | queue | requeue | consumer_tag
+ | realm | version_major | durable | nowait
+ | channel_id | destination | arguments | if_empty
+ | source | exchange | ticket | heartbeat
+ | routing_key | queue | exchange | auto_delete
+ | challenge | multiple | mechanisms | passive
+ | exclusive | class_id | class_id | requeue
+ | exchange | arguments | exchange | source
+ | delivery_tag | insist | ticket | delivery_tag
+ | cluster_id | if_unused | internal | capabilities
+ | message_count | locales | message_count | ticket
+ | nowait | no_local | routing_key | auto_delete
+ | active | routing_key | active | routing_key
+ | routing_key | exchange | reply_text | nowait
+ | delivery_tag | channel_max | routing_key | nowait
+ | queue | ticket | ticket | nowait
+ | destination | consumer_count | redelivered | ticket
+ | multiple | nowait | prefetch_size | passive
+ | nowait | nowait | arguments | ticket
+ | frame_max | no_ack | arguments | consumer_tag
+ | nowait | exchange | prefetch_count | exchange
+ | ticket | write | response | nowait
+ | arguments | durable | client_properties | active
+ | no_ack | arguments | queue | ticket
+ | exchange | reply_code | heartbeat | method_id
+ | routing_key | arguments | mechanism | server_properties
+ | read | if_unused | consumer_tag | message_count
+ | consumer_tag | exclusive | queue | response
+ | global | mandatory | out_of_band | requeue
+ | queue | reply_code | known_hosts | ticket