Skip to content
Browse files

Upgraded to RabbitMQ 2.4.1

  • Loading branch information...
1 parent e7a5bce commit c6bcba950942ff04e22b1a5cda1e284fd0ba2958 @jbrisbin committed
Showing with 47 additions and 28 deletions.
  1. +31 −7 src/rabbit_backing_queue.erl
  2. +1 −1 src/rabbit_common.app.src
  3. +1 −9 src/rabbit_misc.erl
  4. +14 −11 src/supervisor2.erl
View
38 src/rabbit_backing_queue.erl
@@ -39,13 +39,12 @@ behaviour_info(callbacks) ->
%% 2. a boolean indicating whether the queue is durable
%% 3. a boolean indicating whether the queue is an existing queue
%% that should be recovered
- %% 4. an asynchronous callback which accepts a function from
- %% state to state and invokes it with the current backing
- %% queue state. This is useful for handling events, e.g. when
- %% the backing queue does not have its own process to receive
- %% such events, or when the processing of an event results in
- %% a state transition the queue logic needs to know about
- %% (such as messages getting confirmed).
+ %% 4. an asynchronous callback which accepts a function of type
+ %% backing-queue-state to backing-queue-state. This callback
+ %% function can be safely invoked from any process, which
+ %% makes it useful for passing messages back into the backing
+ %% queue, especially as the backing queue does not have
+ %% control of its own mailbox.
%% 5. a synchronous callback. Same as the asynchronous callback
%% but waits for completion and returns 'error' on error.
{init, 5},
@@ -71,6 +70,31 @@ behaviour_info(callbacks) ->
%% Return ids of messages which have been confirmed since
%% the last invocation of this function (or initialisation).
+ %%
+ %% Message ids should only appear in the result of
+ %% drain_confirmed under the following circumstances:
+ %%
+ %% 1. The message appears in a call to publish_delivered/4 and
+ %% the first argument (ack_required) is false; or
+ %% 2. The message is fetched from the queue with fetch/2 and the
+ %% first argument (ack_required) is false; or
+ %% 3. The message is acked (ack/2 is called for the message); or
+ %% 4. The message is fully fsync'd to disk in such a way that the
+ %% recovery of the message is guaranteed in the event of a
+ %% crash of this rabbit node (excluding hardware failure).
+ %%
+ %% In addition to the above conditions, a message id may only
+ %% appear in the result of drain_confirmed if
+ %% #message_properties.needs_confirming = true when the msg was
+ %% published (through whichever means) to the backing queue.
+ %%
+ %% It is legal for the same message id to appear in the results
+ %% of multiple calls to drain_confirmed, which means that the
+ %% backing queue is not required to keep track of which messages
+ %% it has already confirmed. The confirm will be issued to the
+ %% publisher the first time the message id appears in the result
+ %% of drain_confirmed. All subsequent appearances of that message
+ %% id will be ignored.
{drain_confirmed, 1},
%% Drop messages from the head of the queue while the supplied
View
2 src/rabbit_common.app.src
@@ -1,6 +1,6 @@
{application, rabbit_common,
[{description, "RabbitMQ Common Libraries"},
- {vsn, "2.4.0"},
+ {vsn, "2.4.1"},
{modules, []},
{registered, []},
{env, []},
View
10 src/rabbit_misc.erl
@@ -48,8 +48,7 @@
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
--export([recursive_delete/1, recursive_copy/2, dict_cons/3, orddict_cons/3,
- unlink_and_capture_exit/1]).
+-export([recursive_delete/1, recursive_copy/2, dict_cons/3, orddict_cons/3]).
-export([get_options/2]).
-export([all_module_attributes/1, build_acyclic_graph/3]).
-export([now_ms/0]).
@@ -178,7 +177,6 @@
-> rabbit_types:ok_or_error({file:filename(), file:filename(), any()})).
-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
-spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()).
--spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok').
-spec(get_options/2 :: ([optdef()], [string()])
-> {[string()], [{string(), any()}]}).
-spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]).
@@ -749,12 +747,6 @@ dict_cons(Key, Value, Dict) ->
orddict_cons(Key, Value, Dict) ->
orddict:update(Key, fun (List) -> [Value | List] end, [Value], Dict).
-unlink_and_capture_exit(Pid) ->
- unlink(Pid),
- receive {'EXIT', Pid, _} -> ok
- after 0 -> ok
- end.
-
%% Separate flags and options from arguments.
%% get_options([{flag, "-q"}, {option, "-p", "/"}],
%% ["set_permissions","-p","/","guest",
View
25 src/supervisor2.erl
@@ -38,6 +38,9 @@
%% child is a supervisor and it exits normally (i.e. with reason of
%% 'shutdown') then the child's parent also exits normally.
%%
+%% 5) normal, and {shutdown, _} exit reasons are all treated the same
+%% (i.e. are regarded as normal exits)
+%%
%% All modifications are (C) 2010-2011 VMware, Inc.
%%
%% %CopyrightBegin%
@@ -544,17 +547,12 @@ do_restart({RestartType, Delay}, Reason, Child, State) ->
do_restart(permanent, Reason, Child, State) ->
report_error(child_terminated, Reason, Child, State#state.name),
restart(Child, State);
-do_restart(intrinsic, normal, Child, State) ->
- {shutdown, state_del_child(Child, State)};
-do_restart(intrinsic, shutdown, Child = #child{child_type = supervisor},
- State) ->
- {shutdown, state_del_child(Child, State)};
-do_restart(_, normal, Child, State) ->
- NState = state_del_child(Child, State),
- {ok, NState};
-do_restart(_, shutdown, Child, State) ->
- NState = state_del_child(Child, State),
- {ok, NState};
+do_restart(Type, normal, Child, State) ->
+ del_child_and_maybe_shutdown(Type, Child, State);
+do_restart(Type, {shutdown, _}, Child, State) ->
+ del_child_and_maybe_shutdown(Type, Child, State);
+do_restart(Type, shutdown, Child = #child{child_type = supervisor}, State) ->
+ del_child_and_maybe_shutdown(Type, Child, State);
do_restart(Type, Reason, Child, State) when Type =:= transient orelse
Type =:= intrinsic ->
report_error(child_terminated, Reason, Child, State#state.name),
@@ -564,6 +562,11 @@ do_restart(temporary, Reason, Child, State) ->
NState = state_del_child(Child, State),
{ok, NState}.
+del_child_and_maybe_shutdown(intrinsic, Child, State) ->
+ {shutdown, state_del_child(Child, State)};
+del_child_and_maybe_shutdown(_, Child, State) ->
+ {ok, state_del_child(Child, State)}.
+
restart(Child, State) ->
case add_restart(State) of
{ok, NState} ->

0 comments on commit c6bcba9

Please sign in to comment.
Something went wrong with that request. Please try again.