Permalink
Browse files

Updated to 2.8.2.

  • Loading branch information...
1 parent be44f23 commit be37790892c35debda9e338f6569765ec71a89d5 @jbrisbin committed Apr 30, 2012
View
31 include/rabbit_auth_backend_spec.hrl
@@ -1,31 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
-%%
-
--ifdef(use_specs).
-
--spec(description/0 :: () -> [{atom(), any()}]).
-
--spec(check_user_login/2 :: (rabbit_types:username(), [term()]) ->
- {'ok', rabbit_types:user()} |
- {'refused', string(), [any()]} |
- {'error', any()}).
--spec(check_vhost_access/2 :: (rabbit_types:user(), rabbit_types:vhost()) ->
- boolean() | {'error', any()}).
--spec(check_resource_access/3 :: (rabbit_types:user(),
- rabbit_types:r(atom()),
- rabbit_access_control:permission_atom()) ->
- boolean() | {'error', any()}).
--endif.
View
28 include/rabbit_auth_mechanism_spec.hrl
@@ -1,28 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
-%%
-
--ifdef(use_specs).
-
--spec(description/0 :: () -> [{atom(), any()}]).
--spec(should_offer/1 :: (rabbit_net:socket()) -> boolean()).
--spec(init/1 :: (rabbit_net:socket()) -> any()).
--spec(handle_response/2 :: (binary(), any()) ->
- {'ok', rabbit_types:user()} |
- {'challenge', binary(), any()} |
- {'protocol_error', string(), [any()]} |
- {'refused', string(), [any()]}).
-
--endif.
View
70 include/rabbit_backing_queue_spec.hrl
@@ -1,70 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
-%%
-
--type(fetch_result(Ack) ::
- ('empty' |
- %% Message, IsDelivered, AckTag, Remaining_Len
- {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
--type(is_durable() :: boolean()).
--type(attempt_recovery() :: boolean()).
--type(purged_msg_count() :: non_neg_integer()).
--type(confirm_required() :: boolean()).
--type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
--type(duration() :: ('undefined' | 'infinity' | number())).
-
--type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok')).
-
--spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok').
--spec(stop/0 :: () -> 'ok').
--spec(init/3 :: (rabbit_types:amqqueue(), attempt_recovery(),
- async_callback()) -> state()).
--spec(terminate/2 :: (any(), state()) -> state()).
--spec(delete_and_terminate/2 :: (any(), state()) -> state()).
--spec(purge/1 :: (state()) -> {purged_msg_count(), state()}).
--spec(publish/4 :: (rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state()) ->
- state()).
--spec(publish_delivered/5 :: (true, rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state())
- -> {ack(), state()};
- (false, rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state())
- -> {undefined, state()}).
--spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}).
--spec(dropwhile/3 ::
- (fun ((rabbit_types:message_properties()) -> boolean()),
- msg_fun() | 'undefined', state())
- -> state()).
--spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()};
- (false, state()) -> {fetch_result(undefined), state()}).
--spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
--spec(fold/3 :: (msg_fun(), state(), [ack()]) -> state()).
--spec(requeue/2 :: ([ack()], state())
- -> {[rabbit_guid:guid()], state()}).
--spec(len/1 :: (state()) -> non_neg_integer()).
--spec(is_empty/1 :: (state()) -> boolean()).
--spec(set_ram_duration_target/2 ::
- (duration(), state()) -> state()).
--spec(ram_duration/1 :: (state()) -> {duration(), state()}).
--spec(needs_timeout/1 :: (state()) -> 'false' | 'timed' | 'idle').
--spec(timeout/1 :: (state()) -> state()).
--spec(handle_pre_hibernate/1 :: (state()) -> state()).
--spec(status/1 :: (state()) -> [{atom(), any()}]).
--spec(invoke/3 :: (atom(), fun ((atom(), A) -> A), state()) -> state()).
--spec(is_duplicate/2 ::
- (rabbit_types:basic_message(), state()) ->
- {'false'|'published'|'discarded', state()}).
--spec(discard/3 :: (rabbit_types:basic_message(), pid(), state()) -> state()).
View
38 include/rabbit_exchange_type_spec.hrl
@@ -1,38 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
-%%
-
--ifdef(use_specs).
-
--type(tx() :: 'transaction' | 'none').
--type(serial() :: pos_integer() | tx()).
-
--spec(description/0 :: () -> [{atom(), any()}]).
--spec(serialise_events/0 :: () -> boolean()).
--spec(route/2 :: (rabbit_types:exchange(), rabbit_types:delivery())
- -> rabbit_router:match_result()).
--spec(validate/1 :: (rabbit_types:exchange()) -> 'ok').
--spec(create/2 :: (tx(), rabbit_types:exchange()) -> 'ok').
--spec(delete/3 :: (tx(), rabbit_types:exchange(),
- [rabbit_types:binding()]) -> 'ok').
--spec(add_binding/3 :: (serial(), rabbit_types:exchange(),
- rabbit_types:binding()) -> 'ok').
--spec(remove_bindings/3 :: (serial(), rabbit_types:exchange(),
- [rabbit_types:binding()]) -> 'ok').
--spec(assert_args_equivalence/2 ::
- (rabbit_types:exchange(), rabbit_framing:amqp_table())
- -> 'ok' | rabbit_types:connection_exit()).
-
--endif.
View
45 include/rabbit_msg_store_index.hrl
@@ -1,45 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
-%%
-
--include("rabbit_msg_store.hrl").
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--type(dir() :: any()).
--type(index_state() :: any()).
--type(keyvalue() :: any()).
--type(fieldpos() :: non_neg_integer()).
--type(fieldvalue() :: any()).
-
--spec(new/1 :: (dir()) -> index_state()).
--spec(recover/1 :: (dir()) -> rabbit_types:ok_or_error2(index_state(), any())).
--spec(lookup/2 ::
- (rabbit_types:msg_id(), index_state()) -> ('not_found' | keyvalue())).
--spec(insert/2 :: (keyvalue(), index_state()) -> 'ok').
--spec(update/2 :: (keyvalue(), index_state()) -> 'ok').
--spec(update_fields/3 :: (rabbit_types:msg_id(), ({fieldpos(), fieldvalue()} |
- [{fieldpos(), fieldvalue()}]),
- index_state()) -> 'ok').
--spec(delete/2 :: (rabbit_types:msg_id(), index_state()) -> 'ok').
--spec(delete_object/2 :: (keyvalue(), index_state()) -> 'ok').
--spec(delete_by_file/2 :: (fieldvalue(), index_state()) -> 'ok').
--spec(terminate/1 :: (index_state()) -> any()).
-
--endif.
-
-%%----------------------------------------------------------------------------
View
65 src/gen_server2.erl
@@ -31,13 +31,13 @@
%% handle_pre_hibernate/1 then the default action is to hibernate.
%%
%% 6) init can return a 4th arg, {backoff, InitialTimeout,
-%% MinimumTimeout, DesiredHibernatePeriod} (all in
-%% milliseconds). Then, on all callbacks which can return a timeout
-%% (including init), timeout can be 'hibernate'. When this is the
-%% case, the current timeout value will be used (initially, the
-%% InitialTimeout supplied from init). After this timeout has
-%% occurred, hibernation will occur as normal. Upon awaking, a new
-%% current timeout value will be calculated.
+%% MinimumTimeout, DesiredHibernatePeriod} (all in milliseconds,
+%% 'infinity' does not make sense here). Then, on all callbacks which
+%% can return a timeout (including init), timeout can be
+%% 'hibernate'. When this is the case, the current timeout value will
+%% be used (initially, the InitialTimeout supplied from init). After
+%% this timeout has occurred, hibernation will occur as normal. Upon
+%% awaking, a new current timeout value will be calculated.
%%
%% The purpose is that the gen_server2 takes care of adjusting the
%% current timeout value such that the process will increase the
@@ -135,9 +135,10 @@
%%% Reason = normal | shutdown | Term, terminate(State) is called
%%%
%%% terminate(Reason, State) Let the user module clean up
+%%% Reason = normal | shutdown | {shutdown, Term} | Term
%%% always called when server terminates
%%%
-%%% ==> ok
+%%% ==> ok | Term
%%%
%%% handle_pre_hibernate(State)
%%%
@@ -182,8 +183,6 @@
multi_call/2, multi_call/3, multi_call/4,
enter_loop/3, enter_loop/4, enter_loop/5, enter_loop/6, wake_hib/1]).
--export([behaviour_info/1]).
-
%% System exports
-export([system_continue/3,
system_terminate/4,
@@ -200,12 +199,12 @@
timeout_state, queue, debug, prioritise_call,
prioritise_cast, prioritise_info}).
+-ifdef(use_specs).
+
%%%=========================================================================
%%% Specs. These exist only to shut up dialyzer's warnings
%%%=========================================================================
--ifdef(use_specs).
-
-type(gs2_state() :: #gs2_state{}).
-spec(handle_common_termination/3 ::
@@ -214,18 +213,58 @@
-spec(pre_hibernate/1 :: (gs2_state()) -> no_return()).
-spec(system_terminate/4 :: (_, _, _, gs2_state()) -> no_return()).
--endif.
+-type(millis() :: non_neg_integer()).
%%%=========================================================================
%%% API
%%%=========================================================================
+-callback init(Args :: term()) ->
+ {ok, State :: term()} |
+ {ok, State :: term(), timeout() | hibernate} |
+ {ok, State :: term(), timeout() | hibernate,
+ {backoff, millis(), millis(), millis()}} |
+ ignore |
+ {stop, Reason :: term()}.
+-callback handle_call(Request :: term(), From :: {pid(), Tag :: term()},
+ State :: term()) ->
+ {reply, Reply :: term(), NewState :: term()} |
+ {reply, Reply :: term(), NewState :: term(), timeout() | hibernate} |
+ {noreply, NewState :: term()} |
+ {noreply, NewState :: term(), timeout() | hibernate} |
+ {stop, Reason :: term(),
+ Reply :: term(), NewState :: term()}.
+-callback handle_cast(Request :: term(), State :: term()) ->
+ {noreply, NewState :: term()} |
+ {noreply, NewState :: term(), timeout() | hibernate} |
+ {stop, Reason :: term(), NewState :: term()}.
+-callback handle_info(Info :: term(), State :: term()) ->
+ {noreply, NewState :: term()} |
+ {noreply, NewState :: term(), timeout() | hibernate} |
+ {stop, Reason :: term(), NewState :: term()}.
+-callback terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
+ State :: term()) ->
+ ok | term().
+-callback code_change(OldVsn :: (term() | {down, term()}), State :: term(),
+ Extra :: term()) ->
+ {ok, NewState :: term()} | {error, Reason :: term()}.
+
+%% It's not possible to define "optional" -callbacks, so putting specs
+%% for handle_pre_hibernate/1 and handle_post_hibernate/1 will result
+%% in warnings (the same applied for the behaviour_info before).
+
+-else.
+
+-export([behaviour_info/1]).
+
behaviour_info(callbacks) ->
[{init,1},{handle_call,3},{handle_cast,2},{handle_info,2},
{terminate,2},{code_change,3}];
behaviour_info(_Other) ->
undefined.
+-endif.
+
%%% -----------------------------------------------------------------
%%% Starts a generic server.
%%% start(Mod, Args, Options)
View
76 src/mirrored_supervisor.erl
@@ -120,8 +120,6 @@
delete_child/2, terminate_child/2,
which_children/1, count_children/1, check_childspecs/1]).
--export([behaviour_info/1]).
-
-behaviour(?GEN_SERVER).
-behaviour(?SUPERVISOR).
@@ -142,15 +140,20 @@
-ifdef(use_specs).
--type child() :: pid() | 'undefined'.
--type child_id() :: term().
--type modules() :: [module()] | 'dynamic'.
--type worker() :: 'worker' | 'supervisor'.
--type sup_name() :: {'local', Name :: atom()} | {'global', Name :: atom()}.
--type sup_ref() :: (Name :: atom())
- | {Name :: atom(), Node :: node()}
- | {'global', Name :: atom()}
- | pid().
+%%--------------------------------------------------------------------------
+%% Callback behaviour
+%%--------------------------------------------------------------------------
+
+-callback init(Args :: term()) ->
+ {ok, {{RestartStrategy :: supervisor2:strategy(),
+ MaxR :: non_neg_integer(),
+ MaxT :: non_neg_integer()},
+ [ChildSpec :: supervisor2:child_spec()]}}
+ | ignore.
+
+%%--------------------------------------------------------------------------
+%% Specs
+%%--------------------------------------------------------------------------
-type startlink_err() :: {'already_started', pid()} | 'shutdown' | term().
-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}.
@@ -163,54 +166,26 @@
Args :: term().
-spec start_link(SupName, GroupName, Module, Args) -> startlink_ret() when
- SupName :: sup_name(),
+ SupName :: supervisor2:sup_name(),
GroupName :: group_name(),
Module :: module(),
Args :: term().
--spec start_child(SupRef, ChildSpec) -> supervisor:startchild_ret() when
- SupRef :: sup_ref(),
- ChildSpec :: supervisor:child_spec() | (List :: [term()]).
-
--spec restart_child(SupRef, Id) -> Result when
- SupRef :: sup_ref(),
- Id :: child_id(),
- Result :: {'ok', Child :: child()}
- | {'ok', Child :: child(), Info :: term()}
- | {'error', Error},
- Error :: 'running' | 'not_found' | 'simple_one_for_one' | term().
-
--spec delete_child(SupRef, Id) -> Result when
- SupRef :: sup_ref(),
- Id :: child_id(),
- Result :: 'ok' | {'error', Error},
- Error :: 'running' | 'not_found' | 'simple_one_for_one'.
-
--spec terminate_child(SupRef, Id) -> Result when
- SupRef :: sup_ref(),
- Id :: pid() | child_id(),
- Result :: 'ok' | {'error', Error},
- Error :: 'not_found' | 'simple_one_for_one'.
-
--spec which_children(SupRef) -> [{Id,Child,Type,Modules}] when
- SupRef :: sup_ref(),
- Id :: child_id() | 'undefined',
- Child :: child(),
- Type :: worker(),
- Modules :: modules().
-
--spec check_childspecs(ChildSpecs) -> Result when
- ChildSpecs :: [supervisor:child_spec()],
- Result :: 'ok' | {'error', Error :: term()}.
-
-spec start_internal(Group, ChildSpecs) -> Result when
Group :: group_name(),
- ChildSpecs :: [supervisor:child_spec()],
- Result :: startlink_ret().
+ ChildSpecs :: [supervisor2:child_spec()],
+ Result :: supervisor2:startlink_ret().
-spec create_tables() -> Result when
Result :: 'ok'.
+-else.
+
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) -> [{init,1}];
+behaviour_info(_Other) -> undefined.
+
-endif.
%%----------------------------------------------------------------------------
@@ -250,9 +225,6 @@ which_children(Sup) -> fold(which_children, Sup, fun lists:append/2).
count_children(Sup) -> fold(count_children, Sup, fun add_proplists/2).
check_childspecs(Specs) -> ?SUPERVISOR:check_childspecs(Specs).
-behaviour_info(callbacks) -> [{init,1}];
-behaviour_info(_Other) -> undefined.
-
call(Sup, Msg) ->
?GEN_SERVER:call(child(Sup, mirroring), Msg, infinity).
View
64 src/pmon.erl
@@ -0,0 +1,64 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(pmon).
+
+-export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2,
+ monitored/1, is_empty/1]).
+
+-ifdef(use_specs).
+
+%%----------------------------------------------------------------------------
+
+-export_type([?MODULE/0]).
+
+-opaque(?MODULE() :: dict()).
+
+-spec(new/0 :: () -> ?MODULE()).
+-spec(monitor/2 :: (pid(), ?MODULE()) -> ?MODULE()).
+-spec(monitor_all/2 :: ([pid()], ?MODULE()) -> ?MODULE()).
+-spec(demonitor/2 :: (pid(), ?MODULE()) -> ?MODULE()).
+-spec(is_monitored/2 :: (pid(), ?MODULE()) -> boolean()).
+-spec(erase/2 :: (pid(), ?MODULE()) -> ?MODULE()).
+-spec(monitored/1 :: (?MODULE()) -> [pid()]).
+-spec(is_empty/1 :: (?MODULE()) -> boolean()).
+
+-endif.
+
+new() -> dict:new().
+
+monitor(Pid, M) ->
+ case dict:is_key(Pid, M) of
+ true -> M;
+ false -> dict:store(Pid, erlang:monitor(process, Pid), M)
+ end.
+
+monitor_all(Pids, M) -> lists:foldl(fun monitor/2, M, Pids).
+
+demonitor(Pid, M) ->
+ case dict:find(Pid, M) of
+ {ok, MRef} -> erlang:demonitor(MRef),
+ dict:erase(Pid, M);
+ error -> M
+ end.
+
+is_monitored(Pid, M) -> dict:is_key(Pid, M).
+
+erase(Pid, M) -> dict:erase(Pid, M).
+
+monitored(M) -> dict:fetch_keys(M).
+
+is_empty(M) -> dict:size(M) == 0.
View
132 src/rabbit_amqqueue.erl
@@ -32,7 +32,7 @@
%% internal
--export([internal_declare/2, internal_delete/1, run_backing_queue/3,
+-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
set_ram_duration_target/2, set_maximum_since_use/2]).
-include("rabbit.hrl").
@@ -109,7 +109,7 @@
-spec(stat/1 ::
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
--spec(delete_immediately/1 :: (rabbit_types:amqqueue()) -> 'ok').
+-spec(delete_immediately/1 :: (qpids()) -> 'ok').
-spec(delete/3 ::
(rabbit_types:amqqueue(), 'false', 'false')
-> qlen();
@@ -144,11 +144,11 @@
-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
--spec(internal_delete/1 ::
- (name()) -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit() |
- fun (() -> rabbit_types:ok_or_error('not_found') |
- rabbit_types:connection_exit())).
+-spec(internal_delete/2 ::
+ (name(), pid()) -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit() |
+ fun (() -> rabbit_types:ok_or_error('not_found') |
+ rabbit_types:connection_exit())).
-spec(run_backing_queue/3 ::
(pid(), atom(),
(fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok').
@@ -231,7 +231,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) ->
[ExistingQ = #amqqueue{pid = QPid}] ->
case rabbit_misc:is_process_alive(QPid) of
true -> rabbit_misc:const(ExistingQ);
- false -> TailFun = internal_delete(QueueName),
+ false -> TailFun = internal_delete(QueueName, QPid),
fun () -> TailFun(), ExistingQ end
end
end
@@ -330,54 +330,60 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
[<<"x-expires">>, <<"x-message-ttl">>, <<"x-ha-policy">>]).
check_declare_arguments(QueueName, Args) ->
- [case Fun(rabbit_misc:table_lookup(Args, Key), Args) of
- ok -> ok;
- {error, Error} -> rabbit_misc:protocol_error(
- precondition_failed,
- "invalid arg '~s' for ~s: ~255p",
- [Key, rabbit_misc:rs(QueueName), Error])
- end ||
- {Key, Fun} <-
- [{<<"x-expires">>, fun check_integer_argument/2},
- {<<"x-message-ttl">>, fun check_integer_argument/2},
- {<<"x-ha-policy">>, fun check_ha_policy_argument/2},
- {<<"x-dead-letter-exchange">>, fun check_string_argument/2},
- {<<"x-dead-letter-routing-key">>,
- fun check_dlxrk_argument/2}]],
+ Checks = [{<<"x-expires">>, fun check_positive_int_arg/2},
+ {<<"x-message-ttl">>, fun check_non_neg_int_arg/2},
+ {<<"x-ha-policy">>, fun check_ha_policy_arg/2},
+ {<<"x-dead-letter-exchange">>, fun check_string_arg/2},
+ {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}],
+ [case rabbit_misc:table_lookup(Args, Key) of
+ undefined -> ok;
+ TypeVal -> case Fun(TypeVal, Args) of
+ ok -> ok;
+ {error, Error} -> rabbit_misc:protocol_error(
+ precondition_failed,
+ "invalid arg '~s' for ~s: ~255p",
+ [Key, rabbit_misc:rs(QueueName),
+ Error])
+ end
+ end || {Key, Fun} <- Checks],
ok.
-check_string_argument(undefined, _Args) ->
- ok;
-check_string_argument({longstr, _}, _Args) ->
+check_string_arg({longstr, _}, _Args) ->
ok;
-check_string_argument({Type, _}, _) ->
+check_string_arg({Type, _}, _) ->
{error, {unacceptable_type, Type}}.
-check_integer_argument(undefined, _Args) ->
- ok;
-check_integer_argument({Type, Val}, _Args) when Val > 0 ->
+check_int_arg({Type, _}, _) ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
true -> ok;
false -> {error, {unacceptable_type, Type}}
- end;
-check_integer_argument({_Type, Val}, _Args) ->
- {error, {value_zero_or_less, Val}}.
+ end.
-check_dlxrk_argument(undefined, _Args) ->
- ok;
-check_dlxrk_argument({longstr, _}, Args) ->
+check_positive_int_arg({Type, Val}, Args) ->
+ case check_int_arg({Type, Val}, Args) of
+ ok when Val > 0 -> ok;
+ ok -> {error, {value_zero_or_less, Val}};
+ Error -> Error
+ end.
+
+check_non_neg_int_arg({Type, Val}, Args) ->
+ case check_int_arg({Type, Val}, Args) of
+ ok when Val >= 0 -> ok;
+ ok -> {error, {value_less_than_zero, Val}};
+ Error -> Error
+ end.
+
+check_dlxrk_arg({longstr, _}, Args) ->
case rabbit_misc:table_lookup(Args, <<"x-dead-letter-exchange">>) of
undefined -> {error, routing_key_but_no_dlx_defined};
_ -> ok
end;
-check_dlxrk_argument({Type, _}, _Args) ->
+check_dlxrk_arg({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
-check_ha_policy_argument(undefined, _Args) ->
- ok;
-check_ha_policy_argument({longstr, <<"all">>}, _Args) ->
+check_ha_policy_arg({longstr, <<"all">>}, _Args) ->
ok;
-check_ha_policy_argument({longstr, <<"nodes">>}, Args) ->
+check_ha_policy_arg({longstr, <<"nodes">>}, Args) ->
case rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>) of
undefined ->
{error, {require, 'x-ha-policy-params'}};
@@ -393,9 +399,9 @@ check_ha_policy_argument({longstr, <<"nodes">>}, Args) ->
{Type, _} ->
{error, {ha_nodes_policy_params_not_array_of_longstr, Type}}
end;
-check_ha_policy_argument({longstr, Policy}, _Args) ->
+check_ha_policy_arg({longstr, Policy}, _Args) ->
{error, {invalid_ha_policy, Policy}};
-check_ha_policy_argument({Type, _}, _Args) ->
+check_ha_policy_arg({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
list() ->
@@ -462,8 +468,9 @@ consumers_all(VHostPath) ->
stat(#amqqueue{pid = QPid}) ->
delegate_call(QPid, stat).
-delete_immediately(#amqqueue{ pid = QPid }) ->
- gen_server2:cast(QPid, delete_immediately).
+delete_immediately(QPids) ->
+ [gen_server2:cast(QPid, delete_immediately) || QPid <- QPids],
+ ok.
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
delegate_call(QPid, {delete, IfUnused, IfEmpty}).
@@ -534,13 +541,19 @@ internal_delete1(QueueName) ->
%% after the transaction.
rabbit_binding:remove_for_destination(QueueName).
-internal_delete(QueueName) ->
+internal_delete(QueueName, QPid) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () ->
case mnesia:wread({rabbit_queue, QueueName}) of
[] -> rabbit_misc:const({error, not_found});
[_] -> Deletions = internal_delete1(QueueName),
- rabbit_binding:process_deletions(Deletions)
+ T = rabbit_binding:process_deletions(Deletions),
+ fun() ->
+ ok = T(),
+ ok = rabbit_event:notify(queue_deleted,
+ [{pid, QPid},
+ {name, QueueName}])
+ end
end
end).
@@ -555,14 +568,25 @@ set_maximum_since_use(QPid, Age) ->
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
- fun () -> Dels = qlc:e(qlc:q([delete_queue(QueueName) ||
- #amqqueue{name = QueueName, pid = Pid,
- slave_pids = []}
- <- mnesia:table(rabbit_queue),
- node(Pid) == Node])),
- rabbit_binding:process_deletions(
- lists:foldl(fun rabbit_binding:combine_deletions/2,
- rabbit_binding:new_deletions(), Dels))
+ fun () -> QsDels =
+ qlc:e(qlc:q([{{QName, Pid}, delete_queue(QName)} ||
+ #amqqueue{name = QName, pid = Pid,
+ slave_pids = []}
+ <- mnesia:table(rabbit_queue),
+ node(Pid) == Node])),
+ {Qs, Dels} = lists:unzip(QsDels),
+ T = rabbit_binding:process_deletions(
+ lists:foldl(fun rabbit_binding:combine_deletions/2,
+ rabbit_binding:new_deletions(), Dels)),
+ fun () ->
+ T(),
+ lists:foreach(
+ fun({QName, QPid}) ->
+ ok = rabbit_event:notify(queue_deleted,
+ [{pid, QPid},
+ {name, QName}])
+ end, Qs)
+ end
end).
delete_queue(QueueName) ->
View
83 src/rabbit_auth_backend.erl
@@ -16,42 +16,57 @@
-module(rabbit_auth_backend).
+-ifdef(use_specs).
+
+%% A description proplist as with auth mechanisms,
+%% exchanges. Currently unused.
+-callback description() -> [proplist:property()].
+
+%% Check a user can log in, given a username and a proplist of
+%% authentication information (e.g. [{password, Password}]).
+%%
+%% Possible responses:
+%% {ok, User}
+%% Authentication succeeded, and here's the user record.
+%% {error, Error}
+%% Something went wrong. Log and die.
+%% {refused, Msg, Args}
+%% Client failed authentication. Log and die.
+-callback check_user_login(rabbit_types:username(), [term()]) ->
+ {'ok', rabbit_types:user()} |
+ {'refused', string(), [any()]} |
+ {'error', any()}.
+
+%% Given #user and vhost, can a user log in to a vhost?
+%% Possible responses:
+%% true
+%% false
+%% {error, Error}
+%% Something went wrong. Log and die.
+-callback check_vhost_access(rabbit_types:user(), rabbit_types:vhost()) ->
+ boolean() | {'error', any()}.
+
+
+%% Given #user, resource and permission, can a user access a resource?
+%%
+%% Possible responses:
+%% true
+%% false
+%% {error, Error}
+%% Something went wrong. Log and die.
+-callback check_resource_access(rabbit_types:user(),
+ rabbit_types:r(atom()),
+ rabbit_access_control:permission_atom()) ->
+ boolean() | {'error', any()}.
+
+-else.
+
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
- [
- %% A description proplist as with auth mechanisms,
- %% exchanges. Currently unused.
- {description, 0},
-
- %% Check a user can log in, given a username and a proplist of
- %% authentication information (e.g. [{password, Password}]).
- %%
- %% Possible responses:
- %% {ok, User}
- %% Authentication succeeded, and here's the user record.
- %% {error, Error}
- %% Something went wrong. Log and die.
- %% {refused, Msg, Args}
- %% Client failed authentication. Log and die.
- {check_user_login, 2},
-
- %% Given #user and vhost, can a user log in to a vhost?
- %% Possible responses:
- %% true
- %% false
- %% {error, Error}
- %% Something went wrong. Log and die.
- {check_vhost_access, 2},
-
- %% Given #user, resource and permission, can a user access a resource?
- %%
- %% Possible responses:
- %% true
- %% false
- %% {error, Error}
- %% Something went wrong. Log and die.
- {check_resource_access, 3}
- ];
+ [{description, 0}, {check_user_login, 2}, {check_vhost_access, 2},
+ {check_resource_access, 3}];
behaviour_info(_Other) ->
undefined.
+
+-endif.
View
56 src/rabbit_auth_mechanism.erl
@@ -16,31 +16,41 @@
-module(rabbit_auth_mechanism).
+-ifdef(use_specs).
+
+%% A description.
+-callback description() -> [proplist:property()].
+
+%% If this mechanism is enabled, should it be offered for a given socket?
+%% (primarily so EXTERNAL can be SSL-only)
+-callback should_offer(rabbit_net:socket()) -> boolean().
+
+%% Called before authentication starts. Should create a state
+%% object to be passed through all the stages of authentication.
+-callback init(rabbit_net:socket()) -> any().
+
+%% Handle a stage of authentication. Possible responses:
+%% {ok, User}
+%% Authentication succeeded, and here's the user record.
+%% {challenge, Challenge, NextState}
+%% Another round is needed. Here's the state I want next time.
+%% {protocol_error, Msg, Args}
+%% Client got the protocol wrong. Log and die.
+%% {refused, Msg, Args}
+%% Client failed authentication. Log and die.
+-callback handle_response(binary(), any()) ->
+ {'ok', rabbit_types:user()} |
+ {'challenge', binary(), any()} |
+ {'protocol_error', string(), [any()]} |
+ {'refused', string(), [any()]}.
+
+-else.
+
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
- [
- %% A description.
- {description, 0},
-
- %% If this mechanism is enabled, should it be offered for a given socket?
- %% (primarily so EXTERNAL can be SSL-only)
- {should_offer, 1},
-
- %% Called before authentication starts. Should create a state
- %% object to be passed through all the stages of authentication.
- {init, 1},
-
- %% Handle a stage of authentication. Possible responses:
- %% {ok, User}
- %% Authentication succeeded, and here's the user record.
- %% {challenge, Challenge, NextState}
- %% Another round is needed. Here's the state I want next time.
- %% {protocol_error, Msg, Args}
- %% Client got the protocol wrong. Log and die.
- %% {refused, Msg, Args}
- %% Client failed authentication. Log and die.
- {handle_response, 2}
- ];
+ [{description, 0}, {should_offer, 1}, {init, 1}, {handle_response, 2}];
behaviour_info(_Other) ->
undefined.
+
+-endif.
View
351 src/rabbit_backing_queue.erl
@@ -16,164 +16,203 @@
-module(rabbit_backing_queue).
+-ifdef(use_specs).
+
+%% We can't specify a per-queue ack/state with callback signatures
+-type(ack() :: any()).
+-type(state() :: any()).
+
+-type(fetch_result(Ack) ::
+ ('empty' |
+ %% Message, IsDelivered, AckTag, Remaining_Len
+ {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
+-type(is_durable() :: boolean()).
+-type(attempt_recovery() :: boolean()).
+-type(purged_msg_count() :: non_neg_integer()).
+-type(confirm_required() :: boolean()).
+-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
+-type(duration() :: ('undefined' | 'infinity' | number())).
+
+-type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') |
+ 'undefined').
+-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())).
+
+%% Called on startup with a list of durable queue names. The queues
+%% aren't being started at this point, but this call allows the
+%% backing queue to perform any checking necessary for the consistency
+%% of those queues, or initialise any other shared resources.
+-callback start([rabbit_amqqueue:name()]) -> 'ok'.
+
+%% Called to tear down any state/resources. NB: Implementations should
+%% not depend on this function being called on shutdown and instead
+%% should hook into the rabbit supervision hierarchy.
+-callback stop() -> 'ok'.
+
+%% Initialise the backing queue and its state.
+%%
+%% Takes
+%% 1. the amqqueue record
+%% 2. a boolean indicating whether the queue is an existing queue that
+%% should be recovered
+%% 3. an asynchronous callback which accepts a function of type
+%% backing-queue-state to backing-queue-state. This callback
+%% function can be safely invoked from any process, which makes it
+%% useful for passing messages back into the backing queue,
+%% especially as the backing queue does not have control of its own
+%% mailbox.
+-callback init(rabbit_types:amqqueue(), attempt_recovery(),
+ async_callback()) -> state().
+
+%% Called on queue shutdown when queue isn't being deleted.
+-callback terminate(any(), state()) -> state().
+
+%% Called when the queue is terminating and needs to delete all its
+%% content.
+-callback delete_and_terminate(any(), state()) -> state().
+
+%% Remove all messages in the queue, but not messages which have been
+%% fetched and are pending acks.
+-callback purge(state()) -> {purged_msg_count(), state()}.
+
+%% Publish a message.
+-callback publish(rabbit_types:basic_message(),
+ rabbit_types:message_properties(), pid(), state()) ->
+ state().
+
+%% Called for messages which have already been passed straight
+%% out to a client. The queue will be empty for these calls
+%% (i.e. saves the round trip through the backing queue).
+-callback publish_delivered(true, rabbit_types:basic_message(),
+ rabbit_types:message_properties(), pid(), state())
+ -> {ack(), state()};
+ (false, rabbit_types:basic_message(),
+ rabbit_types:message_properties(), pid(), state())
+ -> {undefined, state()}.
+
+%% Return ids of messages which have been confirmed since the last
+%% invocation of this function (or initialisation).
+%%
+%% Message ids should only appear in the result of drain_confirmed
+%% under the following circumstances:
+%%
+%% 1. The message appears in a call to publish_delivered/4 and the
+%% first argument (ack_required) is false; or
+%% 2. The message is fetched from the queue with fetch/2 and the first
+%% argument (ack_required) is false; or
+%% 3. The message is acked (ack/2 is called for the message); or
+%% 4. The message is fully fsync'd to disk in such a way that the
+%% recovery of the message is guaranteed in the event of a crash of
+%% this rabbit node (excluding hardware failure).
+%%
+%% In addition to the above conditions, a message id may only appear
+%% in the result of drain_confirmed if
+%% #message_properties.needs_confirming = true when the msg was
+%% published (through whichever means) to the backing queue.
+%%
+%% It is legal for the same message id to appear in the results of
+%% multiple calls to drain_confirmed, which means that the backing
+%% queue is not required to keep track of which messages it has
+%% already confirmed. The confirm will be issued to the publisher the
+%% first time the message id appears in the result of
+%% drain_confirmed. All subsequent appearances of that message id will
+%% be ignored.
+-callback drain_confirmed(state()) -> {[rabbit_guid:guid()], state()}.
+
+%% Drop messages from the head of the queue while the supplied predicate returns
+%% true. Also accepts a boolean parameter that determines whether the messages
+%% necessitate an ack or not. If they do, the function returns a list of
+%% messages with the respective acktags.
+-callback dropwhile(msg_pred(), true, state())
+ -> {[{rabbit_types:basic_message(), ack()}], state()};
+ (msg_pred(), false, state())
+ -> {undefined, state()}.
+
+%% Produce the next message.
+-callback fetch(true, state()) -> {fetch_result(ack()), state()};
+ (false, state()) -> {fetch_result(undefined), state()}.
+
+%% Acktags supplied are for messages which can now be forgotten
+%% about. Must return 1 msg_id per Ack, in the same order as Acks.
+-callback ack([ack()], state()) -> {[rabbit_guid:guid()], state()}.
+
+%% Acktags supplied are for messages which should be processed. The
+%% provided callback function is called with each message.
+-callback fold(msg_fun(), state(), [ack()]) -> state().
+
+%% Reinsert messages into the queue which have already been delivered
+%% and were pending acknowledgement.
+-callback requeue([ack()], state()) -> {[rabbit_guid:guid()], state()}.
+
+%% How long is my queue?
+-callback len(state()) -> non_neg_integer().
+
+%% Is my queue empty?
+-callback is_empty(state()) -> boolean().
+
+%% For the next three functions, the assumption is that you're
+%% monitoring something like the ingress and egress rates of the
+%% queue. The RAM duration is thus the length of time represented by
+%% the messages held in RAM given the current rates. If you want to
+%% ignore all of this stuff, then do so, and return 0 in
+%% ram_duration/1.
+
+%% The target is to have no more messages in RAM than indicated by the
+%% duration and the current queue rates.
+-callback set_ram_duration_target(duration(), state()) -> state().
+
+%% Optionally recalculate the duration internally (likely to be just
+%% update your internal rates), and report how many seconds the
+%% messages in RAM represent given the current rates of the queue.
+-callback ram_duration(state()) -> {duration(), state()}.
+
+%% Should 'timeout' be called as soon as the queue process can manage
+%% (either on an empty mailbox, or when a timer fires)?
+-callback needs_timeout(state()) -> 'false' | 'timed' | 'idle'.
+
+%% Called (eventually) after needs_timeout returns 'idle' or 'timed'.
+%% Note this may be called more than once for each 'idle' or 'timed'
+%% returned from needs_timeout
+-callback timeout(state()) -> state().
+
+%% Called immediately before the queue hibernates.
+-callback handle_pre_hibernate(state()) -> state().
+
+%% Exists for debugging purposes, to be able to expose state via
+%% rabbitmqctl list_queues backing_queue_status
+-callback status(state()) -> [{atom(), any()}].
+
+%% Passed a function to be invoked with the relevant backing queue's
+%% state. Useful for when the backing queue or other components need
+%% to pass functions into the backing queue.
+-callback invoke(atom(), fun ((atom(), A) -> A), state()) -> state().
+
+%% Called prior to a publish or publish_delivered call. Allows the BQ
+%% to signal that it's already seen this message (and in what capacity
+%% - i.e. was it published previously or discarded previously) and
+%% thus the message should be dropped.
+-callback is_duplicate(rabbit_types:basic_message(), state())
+ -> {'false'|'published'|'discarded', state()}.
+
+%% Called to inform the BQ about messages which have reached the
+%% queue, but are not going to be further passed to BQ for some
+%% reason. Note that this is may be invoked for messages for which
+%% BQ:is_duplicate/2 has already returned {'published' | 'discarded',
+%% BQS}.
+-callback discard(rabbit_types:basic_message(), pid(), state()) -> state().
+
+-else.
+
-export([behaviour_info/1]).
behaviour_info(callbacks) ->
- [
- %% Called on startup with a list of durable queue names. The
- %% queues aren't being started at this point, but this call
- %% allows the backing queue to perform any checking necessary for
- %% the consistency of those queues, or initialise any other
- %% shared resources.
- {start, 1},
-
- %% Called to tear down any state/resources. NB: Implementations
- %% should not depend on this function being called on shutdown
- %% and instead should hook into the rabbit supervision hierarchy.
- {stop, 0},
-
- %% Initialise the backing queue and its state.
- %%
- %% Takes
- %% 1. the amqqueue record
- %% 2. a boolean indicating whether the queue is an existing queue
- %% that should be recovered
- %% 3. an asynchronous callback which accepts a function of type
- %% backing-queue-state to backing-queue-state. This callback
- %% function can be safely invoked from any process, which
- %% makes it useful for passing messages back into the backing
- %% queue, especially as the backing queue does not have
- %% control of its own mailbox.
- {init, 3},
-
- %% Called on queue shutdown when queue isn't being deleted.
- {terminate, 2},
-
- %% Called when the queue is terminating and needs to delete all
- %% its content.
- {delete_and_terminate, 2},
-
- %% Remove all messages in the queue, but not messages which have
- %% been fetched and are pending acks.
- {purge, 1},
-
- %% Publish a message.
- {publish, 4},
-
- %% Called for messages which have already been passed straight
- %% out to a client. The queue will be empty for these calls
- %% (i.e. saves the round trip through the backing queue).
- {publish_delivered, 5},
-
- %% Return ids of messages which have been confirmed since
- %% the last invocation of this function (or initialisation).
- %%
- %% Message ids should only appear in the result of
- %% drain_confirmed under the following circumstances:
- %%
- %% 1. The message appears in a call to publish_delivered/4 and
- %% the first argument (ack_required) is false; or
- %% 2. The message is fetched from the queue with fetch/2 and the
- %% first argument (ack_required) is false; or
- %% 3. The message is acked (ack/2 is called for the message); or
- %% 4. The message is fully fsync'd to disk in such a way that the
- %% recovery of the message is guaranteed in the event of a
- %% crash of this rabbit node (excluding hardware failure).
- %%
- %% In addition to the above conditions, a message id may only
- %% appear in the result of drain_confirmed if
- %% #message_properties.needs_confirming = true when the msg was
- %% published (through whichever means) to the backing queue.
- %%
- %% It is legal for the same message id to appear in the results
- %% of multiple calls to drain_confirmed, which means that the
- %% backing queue is not required to keep track of which messages
- %% it has already confirmed. The confirm will be issued to the
- %% publisher the first time the message id appears in the result
- %% of drain_confirmed. All subsequent appearances of that message
- %% id will be ignored.
- {drain_confirmed, 1},
-
- %% Drop messages from the head of the queue while the supplied
- %% predicate returns true. A callback function is supplied
- %% allowing callers access to messages that are about to be
- %% dropped.
- {dropwhile, 3},
-
- %% Produce the next message.
- {fetch, 2},
-
- %% Acktags supplied are for messages which can now be forgotten
- %% about. Must return 1 msg_id per Ack, in the same order as
- %% Acks.
- {ack, 2},
-
- %% Acktags supplied are for messages which should be
- %% processed. The provided callback function is called with each
- %% message.
- {fold, 3},
-
- %% Reinsert messages into the queue which have already been
- %% delivered and were pending acknowledgement.
- {requeue, 2},
-
- %% How long is my queue?
- {len, 1},
-
- %% Is my queue empty?
- {is_empty, 1},
-
- %% For the next three functions, the assumption is that you're
- %% monitoring something like the ingress and egress rates of the
- %% queue. The RAM duration is thus the length of time represented
- %% by the messages held in RAM given the current rates. If you
- %% want to ignore all of this stuff, then do so, and return 0 in
- %% ram_duration/1.
-
- %% The target is to have no more messages in RAM than indicated
- %% by the duration and the current queue rates.
- {set_ram_duration_target, 2},
-
- %% Optionally recalculate the duration internally (likely to be
- %% just update your internal rates), and report how many seconds
- %% the messages in RAM represent given the current rates of the
- %% queue.
- {ram_duration, 1},
-
- %% Should 'timeout' be called as soon as the queue process
- %% can manage (either on an empty mailbox, or when a timer
- %% fires)?
- {needs_timeout, 1},
-
- %% Called (eventually) after needs_timeout returns 'idle' or
- %% 'timed'. Note this may be called more than once for each
- %% 'idle' or 'timed' returned from needs_timeout.
- {timeout, 1},
-
- %% Called immediately before the queue hibernates.
- {handle_pre_hibernate, 1},
-
- %% Exists for debugging purposes, to be able to expose state via
- %% rabbitmqctl list_queues backing_queue_status
- {status, 1},
-
- %% Passed a function to be invoked with the relevant backing
- %% queue's state. Useful for when the backing queue or other
- %% components need to pass functions into the backing queue.
- {invoke, 3},
-
- %% Called prior to a publish or publish_delivered call. Allows
- %% the BQ to signal that it's already seen this message (and in
- %% what capacity - i.e. was it published previously or discarded
- %% previously) and thus the message should be dropped.
- {is_duplicate, 2},
-
- %% Called to inform the BQ about messages which have reached the
- %% queue, but are not going to be further passed to BQ for some
- %% reason. Note that this is may be invoked for messages for
- %% which BQ:is_duplicate/2 has already returned {'published' |
- %% 'discarded', BQS}.
- {discard, 3}
- ];
+ [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
+ {delete_and_terminate, 2}, {purge, 1}, {publish, 4},
+ {publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3},
+ {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
+ {is_empty, 1}, {set_ram_duration_target, 2}, {ram_duration, 1},
+ {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1},
+ {status, 1}, {invoke, 3}, {is_duplicate, 2}, {discard, 3}];
behaviour_info(_Other) ->
undefined.
+
+-endif.
View
19 src/rabbit_basic.erl
@@ -20,7 +20,7 @@
-export([publish/4, publish/6, publish/1,
message/3, message/4, properties/1, append_table_header/3,
- extract_headers/1, replace_headers/2, delivery/4, header_routes/1]).
+ extract_headers/1, map_headers/2, delivery/4, header_routes/1]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -63,8 +63,8 @@
-spec(extract_headers/1 :: (rabbit_types:content()) -> headers()).
--spec(replace_headers/2 :: (headers(), rabbit_types:content())
- -> rabbit_types:content()).
+-spec(map_headers/2 :: (fun((headers()) -> headers()), rabbit_types:content())
+ -> rabbit_types:content()).
-spec(header_routes/1 ::
(undefined | rabbit_framing:amqp_table()) -> [string()]).
@@ -193,15 +193,18 @@ extract_headers(Content) ->
rabbit_binary_parser:ensure_content_decoded(Content),
Headers.
-replace_headers(Headers, Content = #content{properties = Props}) ->
+map_headers(F, Content) ->
+ Content1 = rabbit_binary_parser:ensure_content_decoded(Content),
+ #content{properties = #'P_basic'{headers = Headers} = Props} = Content1,
+ Headers1 = F(Headers),
rabbit_binary_generator:clear_encoded_content(
- Content#content{properties = Props#'P_basic'{headers = Headers}}).
+ Content1#content{properties = Props#'P_basic'{headers = Headers1}}).
indexof(L, Element) -> indexof(L, Element, 1).
-indexof([], _Element, _N) -> 0;
-indexof([Element | _Rest], Element, N) -> N;
-indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1).
+indexof([], _Element, _N) -> 0;
+indexof([Element | _Rest], Element, N) -> N;
+indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1).
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->
View
170 src/rabbit_channel.erl
@@ -36,9 +36,9 @@
conn_name, limiter, tx_status, next_tag, unacked_message_q,
uncommitted_message_q, uncommitted_acks, uncommitted_nacks, user,
virtual_host, most_recently_declared_queue, queue_monitors,
- consumer_mapping, blocking, queue_consumers, queue_collector_pid,
- stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
- unconfirmed_qm, confirmed, capabilities, trace_state}).
+ consumer_mapping, blocking, queue_consumers, delivering_queues,
+ queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
+ unconfirmed, confirmed, capabilities, trace_state}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -194,15 +194,15 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
- queue_monitors = sets:new(),
+ queue_monitors = pmon:new(),
consumer_mapping = dict:new(),
blocking = sets:new(),
queue_consumers = dict:new(),
+ delivering_queues = sets:new(),
queue_collector_pid = CollectorPid,
confirm_enabled = false,
publish_seqno = 1,
- unconfirmed_mq = gb_trees:empty(),
- unconfirmed_qm = gb_trees:empty(),
+ unconfirmed = dtree:empty(),
confirmed = [],
capabilities = Capabilities,
trace_state = rabbit_trace:init(VHost)},
@@ -332,10 +332,11 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
State2 = queue_blocked(QPid, State1),
State3 = handle_consuming_queue_down(QPid, State2),
+ State4 = handle_delivering_queue_down(QPid, State3),
credit_flow:peer_down(QPid),
erase_queue_stats(QPid),
- noreply(State3#ch{queue_monitors =
- sets:del_element(QPid, State3#ch.queue_monitors)});
+ noreply(State3#ch{queue_monitors = pmon:erase(
+ QPid, State4#ch.queue_monitors)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -548,45 +549,9 @@ record_confirms(MXs, State = #ch{confirmed = C}) ->
confirm([], _QPid, State) ->
State;
-confirm(MsgSeqNos, QPid, State) ->
- {MXs, State1} = process_confirms(MsgSeqNos, QPid, false, State),
- record_confirms(MXs, State1).
-
-process_confirms(MsgSeqNos, QPid, Nack, State) ->
- lists:foldl(
- fun(MsgSeqNo, {_MXs, _State = #ch{unconfirmed_mq = UMQ0}} = Acc) ->
- case gb_trees:lookup(MsgSeqNo, UMQ0) of
- {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ,
- Acc, Nack);
- none -> Acc
- end
- end, {[], State}, MsgSeqNos).
-
-remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs},
- {MXs, State = #ch{unconfirmed_mq = UMQ,
- unconfirmed_qm = UQM}},
- Nack) ->
- State1 = case gb_trees:lookup(QPid, UQM) of
- {value, MsgSeqNos} ->
- MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
- case gb_sets:is_empty(MsgSeqNos1) of
- true -> UQM1 = gb_trees:delete(QPid, UQM),
- State#ch{unconfirmed_qm = UQM1};
- false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
- State#ch{unconfirmed_qm = UQM1}
- end;
- none ->
- State
- end,
- Qs1 = gb_sets:del_element(QPid, Qs),
- %% If QPid somehow died initiating a nack, clear the message from
- %% internal data-structures. Also, cleanup empty entries.
- case (Nack orelse gb_sets:is_empty(Qs1)) of
- true -> UMQ1 = gb_trees:delete(MsgSeqNo, UMQ),
- {[{MsgSeqNo, XName} | MXs], State1#ch{unconfirmed_mq = UMQ1}};
- false -> UMQ1 = gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ),
- {MXs, State1#ch{unconfirmed_mq = UMQ1}}
- end.
+confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
+ {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC),
+ record_confirms(MXs, State#ch{unconfirmed = UC1}).
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -694,7 +659,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
QueueName, ConnPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
- Msg = {_QName, _QPid, _MsgId, Redelivered,
+ Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content}}} ->
@@ -706,7 +671,8 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
message_count = MessageCount},
Content),
- {noreply, record_sent(none, not(NoAck), Msg, State)};
+ State1 = monitor_delivering_queue(NoAck, QPid, State),
+ {noreply, record_sent(none, not(NoAck), Msg, State1)};
empty ->
{reply, #'basic.get_empty'{}, State}
end;
@@ -744,10 +710,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
consumer_tag = ActualConsumerTag})),
Q}
end) of
- {ok, Q} ->
- State1 = State#ch{consumer_mapping =
- dict:store(ActualConsumerTag, Q,
- ConsumerMapping)},
+ {ok, Q = #amqqueue{pid = QPid}} ->
+ CM1 = dict:store(ActualConsumerTag, Q, ConsumerMapping),
+ State1 = monitor_delivering_queue(
+ NoAck, QPid, State#ch{consumer_mapping = CM1}),
{noreply,
case NoWait of
true -> consumer_monitor(ActualConsumerTag, State1);
@@ -795,9 +761,7 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
fun () -> {error, not_found} end,
fun () ->
rabbit_amqqueue:basic_cancel(
- Q, self(), ConsumerTag,
- ok_msg(NoWait, #'basic.cancel_ok'{
- consumer_tag = ConsumerTag}))
+ Q, self(), ConsumerTag, ok_msg(NoWait, OkMsg))
end) of
ok ->
{noreply, NewState};
@@ -974,15 +938,15 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
{error, not_found} ->
case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
Args, Owner) of
- {new, Q = #amqqueue{}} ->
+ {new, #amqqueue{pid = QPid}} ->
%% We need to notify the reader within the channel
%% process so that we can be sure there are no
%% outstanding exclusive queues being declared as
%% the connection shuts down.
ok = case Owner of
none -> ok;
_ -> rabbit_queue_collector:register(
- CollectorPid, Q)
+ CollectorPid, QPid)
end,
return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
{existing, _Q} ->
@@ -1128,6 +1092,7 @@ handle_method(_MethodRecord, _Content, _State) ->
consumer_monitor(ConsumerTag,
State = #ch{consumer_mapping = ConsumerMapping,
+ queue_monitors = QMons,
queue_consumers = QCons,
capabilities = Capabilities}) ->
case rabbit_misc:table_lookup(
@@ -1140,35 +1105,27 @@ consumer_monitor(ConsumerTag,
end,
gb_sets:singleton(ConsumerTag),
QCons),
- monitor_queue(QPid, State#ch{queue_consumers = QCons1});
+ State#ch{queue_monitors = pmon:monitor(QPid, QMons),
+ queue_consumers = QCons1};
_ ->
State
end.
-monitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
- case sets:is_element(QPid, QMons) of
- false -> erlang:monitor(process, QPid),
- State#ch{queue_monitors = sets:add_element(QPid, QMons)};
- true -> State
+monitor_delivering_queue(true, _QPid, State) ->
+ State;
+monitor_delivering_queue(false, QPid, State = #ch{queue_monitors = QMons,
+ delivering_queues = DQ}) ->
+ State#ch{queue_monitors = pmon:monitor(QPid, QMons),
+ delivering_queues = sets:add_element(QPid, DQ)}.
+
+handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC}) ->
+ case rabbit_misc:is_abnormal_termination(Reason) of
+ true -> {MXs, UC1} = dtree:take_all(QPid, UC),
+ send_nacks(MXs, State#ch{unconfirmed = UC1});
+ false -> {MXs, UC1} = dtree:take(QPid, UC),
+ record_confirms(MXs, State#ch{unconfirmed = UC1})
end.
-handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
- MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
- {value, MsgSet} -> gb_sets:to_list(MsgSet);
- none -> []
- end,
- %% We remove the MsgSeqNos from UQM before calling
- %% process_confirms to prevent each MsgSeqNo being removed from
- %% the set one by one which which would be inefficient
- State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)},
- {Nack, SendFun} =
- case rabbit_misc:is_abnormal_termination(Reason) of
- true -> {true, fun send_nacks/2};
- false -> {false, fun record_confirms/2}
- end,
- {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1),
- SendFun(MXs, State2).
-
handle_consuming_queue_down(QPid,
State = #ch{consumer_mapping = ConsumerMapping,
queue_consumers = QCons,
@@ -1187,6 +1144,9 @@ handle_consuming_queue_down(QPid,
State#ch{consumer_mapping = ConsumerMapping1,
queue_consumers = dict:erase(QPid, QCons)}.
+handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
+ State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
+
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
@@ -1322,9 +1282,11 @@ new_tx(State) -> State#ch{uncommitted_message_q = queue:new(),
notify_queues(State = #ch{state = closing}) ->
{ok, State};
-notify_queues(State = #ch{consumer_mapping = Consumers}) ->
- {rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()),
- State#ch{state = closing}}.
+notify_queues(State = #ch{consumer_mapping = Consumers,
+ delivering_queues = DQ }) ->
+ QPids = sets:to_list(
+ sets:union(sets:from_list(consumer_queues(Consumers)), DQ)),
+ {rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}.
fold_per_queue(_F, Acc, []) ->
Acc;
@@ -1370,7 +1332,9 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
QNames}, State) ->
{RoutingRes, DeliveredQPids} =
rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery),
- State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids),
+ State1 = State#ch{queue_monitors =
+ pmon:monitor_all(DeliveredQPids,
+ State#ch.queue_monitors)},
State2 = process_routing_result(RoutingRes, DeliveredQPids,
XName, MsgSeqNo, Message, State1),
maybe_incr_stats([{XName, 1} |
@@ -1392,30 +1356,16 @@ process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
process_routing_result(routed, _, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
- #ch{unconfirmed_mq = UMQ} = State,
- UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ),
- SingletonSet = gb_sets:singleton(MsgSeqNo),
- lists:foldl(
- fun (QPid, State0 = #ch{unconfirmed_qm = UQM}) ->
- case gb_trees:lookup(QPid, UQM) of
- {value, MsgSeqNos} ->
- MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos),
- UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
- State0#ch{unconfirmed_qm = UQM1};
- none ->
- UQM1 = gb_trees:insert(QPid, SingletonSet, UQM),
- State0#ch{unconfirmed_qm = UQM1}
- end
- end, State#ch{unconfirmed_mq = UMQ1}, QPids).
+ State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
+ State#ch.unconfirmed)}.
send_nacks([], State) ->
State;
send_nacks(MXs, State = #ch{tx_status = none}) ->
- MsgSeqNos = [ MsgSeqNo || {MsgSeqNo, _} <- MXs ],
- coalesce_and_send(MsgSeqNos,
+ coalesce_and_send([MsgSeqNo || {MsgSeqNo, _} <- MXs],
fun(MsgSeqNo, Multiple) ->
#'basic.nack'{delivery_tag = MsgSeqNo,
- multiple = Multiple}
+ multiple = Multiple}
end, State);
send_nacks(_, State) ->
maybe_complete_tx(State#ch{tx_status = failed}).
@@ -1445,11 +1395,11 @@ send_confirms(Cs, State) ->
end, State).
coalesce_and_send(MsgSeqNos, MkMsgFun,
- State = #ch{writer_pid = WriterPid, unconfirmed_mq = UMQ}) ->
+ State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
SMsgSeqNos = lists:usort(MsgSeqNos),
- CutOff = case gb_trees:is_empty(UMQ) of
+ CutOff = case dtree:is_empty(UC) of
true -> lists:last(SMsgSeqNos) + 1;
- false -> {SeqNo, _XQ} = gb_trees:smallest(UMQ), SeqNo
+ false -> {SeqNo, _XName} = dtree:smallest(UC), SeqNo
end,
{Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos),
case Ms of
@@ -1463,8 +1413,8 @@ coalesce_and_send(MsgSeqNos, MkMsgFun,
maybe_complete_tx(State = #ch{tx_status = in_progress}) ->
State;
-maybe_complete_tx(State = #ch{unconfirmed_mq = UMQ}) ->
- case gb_trees:is_empty(UMQ) of
+maybe_complete_tx(State = #ch{unconfirmed = UC}) ->
+ case dtree:is_empty(UC) of
false -> State;
true -> complete_tx(State#ch{confirmed = []})
end.
@@ -1492,8 +1442,8 @@ i(confirm, #ch{confirm_enabled = CE}) -> CE;
i(name, State) -> name(State);
i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
-i(messages_unconfirmed, #ch{unconfirmed_mq = UMQ}) ->
- gb_trees:size(UMQ);
+i(messages_unconfirmed, #ch{unconfirmed = UC}) ->
+ dtree:size(UC);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) ->
queue:len(UAMQ);
i(messages_uncommitted, #ch{uncommitted_message_q = TMQ}) ->
View
66 src/rabbit_exchange_type.erl
@@ -16,39 +16,57 @@
-module(rabbit_exchange_type).
--export([behaviour_info/1]).
+-ifdef(use_specs).
-behaviour_info(callbacks) ->
- [
- {description, 0},
+-type(tx() :: 'transaction' | 'none').
+-type(serial() :: pos_integer() | tx()).
+
+-callback description() -> [proplist:property()].
+
+%% Should Rabbit ensure that all binding events that are
+%% delivered to an individual exchange can be serialised? (they
+%% might still be delivered out of order, but there'll be a
+%% serial number).
+-callback serialise_events() -> boolean().
- %% Should Rabbit ensure that all binding events that are
- %% delivered to an individual exchange can be serialised? (they
- %% might still be delivered out of order, but there'll be a
- %% serial number).
- {serialise_events, 0},
+%% The no_return is there so that we can have an "invalid" exchange
+%% type (see rabbit_exchange_type_invalid).
+-callback route(rabbit_types:exchange(), rabbit_types:delivery()) ->
+ rabbit_router:match_result().
- {route, 2},
+%% called BEFORE declaration, to check args etc; may exit with #amqp_error{}
+-callback validate(rabbit_types:exchange()) -> 'ok'.
- %% called BEFORE declaration, to check args etc; may exit with #amqp_error{}
- {validate, 1},
+%% called after declaration and recovery
+-callback create(tx(), rabbit_types:exchange()) -> 'ok'.
- %% called after declaration and recovery
- {create, 2},
+%% called after exchange (auto)deletion.
+-callback delete(tx(), rabbit_types:exchange(), [rabbit_types:binding()]) ->
+ 'ok'.
- %% called after exchange (auto)deletion.
- {delete, 3},
+%% called after a binding has been added or recovered
+-callback add_binding(serial(), rabbit_types:exchange(),
+ rabbit_types:binding()) -> 'ok'.
- %% called after a binding has been added or recovered
- {add_binding, 3},
+%% called after bindings have been deleted.
+-callback remove_bindings(serial(), rabbit_types:exchange(),
+ [rabbit_types:binding()]) -> 'ok'.
- %% called after bindings have been deleted.
- {remove_bindings, 3},
+%% called when comparing exchanges for equivalence - should return ok or
+%% exit with #amqp_error{}
+-callback assert_args_equivalence (rabbit_types:exchange(),
+ rabbit_framing:amqp_table()) ->
+ 'ok' | rabbit_types:connection_exit().
- %% called when comparing exchanges for equivalence - should return ok or
- %% exit with #amqp_error{}
- {assert_args_equivalence, 2}
+-else.
- ];
+-export([behaviour_info/1]).
+
+behaviour_info(callbacks) ->
+ [{description, 0}, {serialise_events, 0}, {route, 2}, {validate, 1},
+ {create, 2}, {delete, 3}, {add_binding, 3}, {remove_bindings, 3},
+ {assert_args_equivalence, 2}];
behaviour_info(_Other) ->
undefined.
+
+-endif.
View
108 src/rabbit_framing_amqp_0_8.erl
@@ -136,60 +136,60 @@
| #'test.string_ok'{} | #'test.table'{} | #'test.table_ok'{} | #'test.content'{}
| #'test.content_ok'{} )).
-type(amqp_method_field_name() ::
- ( challenge | ticket | mandatory | routing_key
- | string_1 | nowait | redelivered | operation
- | requeue | queue | global | queue
- | no_ack | nowait | reply_text | internal
- | delivery_tag | mandatory | consumer_count | integer_3
- | exclusive | consumer_tag | consumer_tag | routing_key
- | message_count | prefetch_size | channel_max | exchange
- | ticket | meta_data | exchange | ticket
- | frame_max | string_op | consumer_tag | read
- | durable | nowait | ticket | reply_code
- | integer_4 | global | details | exclusive
- | delivery_tag | no_local | arguments | immediate
- | auto_delete | exchange | consumer_tag | delivery_tag
- | known_hosts | method_id | requeue | frame_max
- | exclusive | response | exchange | staged_size
- | multiple | client_properties | immediate | version_minor
- | operation | locale | ticket | routing_key
- | nowait | version_major | ticket | identifier
- | active | consumer_tag | queue | active
- | routing_key | table | queue | nowait
- | channel_max | queue | queue | prefetch_count
- | identifier | identifier | queue | heartbeat
- | class_id | reply_text | passive | nowait
- | exchange | if_unused | ticket | redelivered
- | reply_text | routing_key | realm | response
- | reply_code | delivery_tag | reply_code | nowait
- | ticket | passive | no_local | integer_result
- | arguments | consumer_tag | host | routing_key
- | message_count | ticket | exchange | global
- | ticket | reply_text | no_ack | integer_2
- | nowait | ticket | mandatory | integer_op
- | result | exchange | requeue | prefetch_size
- | consumer_tag | exclusive | content_size | reply_text
- | mechanism | exchange | exchange | active
- | reply_code | no_local | server_properties | result
- | consumer_tag | known_hosts | ticket | exchange
- | dtx_identifier | consumer_tag | virtual_host | type
- | exchange | reply_text | multiple | auto_delete
- | nowait | reply_code | arguments | consumer_tag
- | mechanisms | arguments | delivery_tag | prefetch_size
- | consumer_tag | class_id | consumer_tag | message_count
- | exchange | queue | exclusive | delivery_tag
- | queue | string_2 | immediate | cluster_id
- | passive | nowait | content_checksum | prefetch_count
- | if_unused | string_result | consumer_tag | heartbeat
- | queue | integer_1 | routing_key | consumer_tag
- | routing_key | ticket | exchange | if_empty
- | prefetch_count | exchange | ticket | locales
- | queue | routing_key | method_id | message_count
- | ticket | routing_key | reply_code | requeue
- | out_of_band | write | delivery_tag | durable
- | delivery_tag | insist | capabilities | redelivered
- | consume_rate | no_ack | routing_key | nowait
- | consumer_tag | nowait )).
+ ( consumer_tag | prefetch_count | message_count | exclusive
+ | delivery_tag | string_2 | immediate | cluster_id
+ | ticket | content_checksum | ticket | no_local
+ | routing_key | frame_max | routing_key | out_of_band
+ | routing_key | durable | if_empty | exchange
+ | nowait | queue | routing_key | nowait
+ | exchange | routing_key | result | ticket
+ | routing_key | ticket | requeue | active
+ | channel_max | exchange | delivery_tag | queue
+ | consumer_tag | integer_3 | operation | consume_rate
+ | reply_text | consumer_tag | consumer_count | nowait
+ | global | reply_text | nowait | redelivered
+ | mandatory | operation | message_count | auto_delete
+ | nowait | channel_max | string_op | message_count
+ | method_id | delivery_tag | write | mandatory
+ | integer_4 | ticket | integer_result | nowait
+ | consumer_tag | nowait | nowait | prefetch_size
+ | details | passive | ticket | no_local
+ | meta_data | ticket | class_id | reply_code
+ | ticket | consumer_tag | known_hosts | routing_key
+ | exchange | frame_max | exclusive | reply_code
+ | response | global | consumer_tag | exclusive
+ | delivery_tag | read | no_ack | delivery_tag
+ | reply_code | exchange | version_major | if_unused
+ | reply_code | identifier | requeue | exclusive
+ | active | exclusive | nowait | consumer_tag
+ | staged_size | method_id | immediate | realm
+ | prefetch_count | type | ticket | routing_key
+ | heartbeat | arguments | content_size | ticket
+ | passive | reply_code | integer_1 | consumer_tag
+ | mechanism | routing_key | table | reply_text
+ | delivery_tag | arguments | queue | version_minor
+ | consumer_tag | identifier | identifier | reply_code
+ | client_properties | ticket | auto_delete | reply_text
+ | ticket | queue | arguments | insist
+ | host | reply_text | multiple | routing_key
+ | ticket | queue | redelivered | delivery_tag
+ | durable | response | nowait | exchange
+ | string_result | server_properties | routing_key | consumer_tag
+ | prefetch_size | message_count | reply_text | immediate
+ | arguments | exchange | active | ticket
+ | no_ack | integer_2 | no_local | nowait
+ | no_ack | consumer_tag | mandatory | integer_op
+ | queue | capabilities | requeue | prefetch_size
+ | result | exchange | exchange | queue
+ | challenge | exchange | locale | exchange
+ | redelivered | nowait | requeue | known_hosts
+ | queue | consumer_tag | mechanisms | ticket
+ | exchange | class_id | heartbeat | exchange
+ | exchange | multiple | queue | queue
+ | locales | string_1 | consumer_tag | passive
+ | if_unused | virtual_host | delivery_tag | prefetch_count
+ | queue | internal | consumer_tag | global
+ | consumer_tag | dtx_identifier )).
-type(amqp_property_record() ::
( #'P_connection'{} | #'P_channel'{} | #'P_access'{} | #'P_exchange'{}
| #'P_queue'{} | #'P_basic'{} | #'P_file'{} | #'P_stream'{}
View
74 src/rabbit_framing_amqp_0_9_1.erl
@@ -115,43 +115,43 @@
| #'tx.commit'{} | #'tx.commit_ok'{} | #'tx.rollback'{} | #'tx.rollback_ok'{}
| #'confirm.select'{} | #'confirm.select_ok'{} )).
-type(amqp_method_field_name() ::
- ( nowait | source | routing_key | ticket
- | arguments | exchange | reply_text | ticket
- | heartbeat | consumer_tag | queue | source
- | exchange | redelivered | requeue | type
- | multiple | version_major | locales | challenge
- | arguments | ticket | method_id | global
- | no_ack | out_of_band | arguments | consumer_tag
- | reply_code | delivery_tag | ticket | message_count
- | message_count | channel_max | response | ticket
- | version_minor | queue | prefetch_count | consumer_tag
- | mechanisms | queue | ticket | queue
- | ticket | exclusive | reply_code | delivery_tag
- | delivery_tag | requeue | ticket | locale
- | routing_key | virtual_host | frame_max | queue
- | consumer_count | durable | message_count | ticket
- | exclusive | client_properties | realm | mandatory
- | requeue | passive | server_properties | queue
- | channel_id | reply_text | arguments | destination
- | capabilities | passive | if_unused | heartbeat
- | queue | consumer_tag | auto_delete | arguments
- | reply_text | multiple | exchange | routing_key
- | redelivered | exclusive | routing_key | durable
- | nowait | ticket | ticket | method_id
- | reply_code | nowait | routing_key | immediate
- | cluster_id | if_unused | mechanism | no_ack
- | internal | routing_key | message_count | passive
- | destination | class_id | response | if_empty
- | exchange | active | read | ticket
- | exchange | routing_key | requeue | exchange
- | nowait | nowait | consumer_tag | channel_max
- | nowait | exchange | nowait | active
- | insist | arguments | auto_delete | queue
- | delivery_tag | delivery_tag | prefetch_size | nowait
- | nowait | class_id | known_hosts | frame_max
- | active | nowait | nowait | no_local
- | arguments | ticket | exchange | write
- | routing_key )).
+ ( passive | message_count | nowait | message_count
+ | queue | active | consumer_tag | routing_key
+ | mechanisms | no_ack | arguments | nowait
+ | prefetch_count | channel_max | if_empty | ticket
+ | nowait | message_count | requeue | nowait
+ | locale | active | method_id | queue
+ | ticket | ticket | ticket | prefetch_size
+ | mandatory | reply_code | nowait | known_hosts
+ | server_properties | nowait | frame_max | no_local
+ | response | if_unused | arguments | write
+ | response | exclusive | nowait | nowait
+ | global | requeue | arguments | requeue
+ | queue | active | arguments | class_id
+ | exchange | arguments | heartbeat | ticket
+ | queue | routing_key | queue | consumer_tag
+ | type | queue | version_major | reply_text
+ | multiple | routing_key | arguments | mechanism
+ | redelivered | virtual_host | read | queue
+ | message_count | delivery_tag | out_of_band | delivery_tag
+ | reply_code | durable | ticket | channel_max
+ | method_id | consumer_tag | passive | no_ack
+ | version_minor | nowait | consumer_tag | nowait
+ | destination | client_properties | reply_code | nowait
+ | delivery_tag | insist | exchange | frame_max
+ | ticket | ticket | exchange | reply_text
+ | passive | immediate | ticket | durable
+ | destination | ticket | realm | arguments
+ | reply_text | ticket | delivery_tag | channel_id
+ | delivery_tag | cluster_id | class_id | source
+ | queue | capabilities | exchange | exchange
+ | challenge | auto_delete | exchange | consumer_count
+ | routing_key | exclusive | exchange | consumer_tag
+ | routing_key | heartbeat | routing_key | exclusive
+ | exchange | locales | source | routing_key
+ | auto_delete | ticket | if_unused | internal
+ | ticket | routing_key | redelivered | requeue
+ | multiple )).
-type(amqp_property_record() ::
( #'P_connection'{} | #'P_channel'{} | #'P_access'{} | #'P_exchange'{}
| #'P_queue'{} | #'P_basic'{} | #'P_tx'{} | #'P_confirm'{} )).
View
27 src/rabbit_misc.erl
@@ -46,8 +46,7 @@
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
--export([dict_cons/3, orddict_cons/3, gb_trees_cons/3,
- gb_trees_set_insert/3]).
+-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
-export([gb_trees_fold/3, gb_trees_foreach/2]).
-export([get_options/2]).
-export([all_module_attributes/1, build_acyclic_graph/3]).
@@ -60,6 +59,8 @@
-export([append_rpc_all_nodes/4]).
-export([multi_call/2]).
-export([quit/1]).
+-export([os_cmd/1]).
+-export([gb_sets_difference/2]).
%%----------------------------------------------------------------------------
@@ -176,7 +177,6 @@
-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
-spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()).
-spec(gb_trees_cons/3 :: (any(), any(), gb_tree()) -> gb_tree()).
--spec(gb_trees_set_insert/3 :: (any(), any(), gb_tree()) -> gb_tree()).
-spec(gb_trees_fold/3 :: (fun ((any(), any(), A) -> A), A, gb_tree()) -> A).
-spec(gb_trees_foreach/2 ::
(fun ((any(), any()) -> any()), gb_tree()) -> 'ok').
@@ -204,6 +204,8 @@
-spec(multi_call/2 ::
([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}).
-spec(quit/1 :: (integer() | string()) -> no_return()).
+-spec(os_cmd/1 :: (string()) -> string()).
+-spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()).
-endif.
@@ -717,15 +719,6 @@ gb_trees_cons(Key, Value, Tree) ->
none -> gb_trees:insert(Key, [Value], Tree)
end.
-gb_trees_set_insert(Key, Value, Tree) ->
- case gb_trees:lookup(Key, Tree) of
- {value, Values} ->
- Values1 =