Skip to content
Browse files

Upgraded to RabbitMQ 2.4.0

  • Loading branch information...
1 parent 1155b1c commit 289466817d8da4817702af5f8dfef636bedcbca2 @jbrisbin committed Mar 24, 2011
View
4 include/amqp_client.hrl
@@ -43,3 +43,7 @@
-define(LOG_DEBUG(Format), error_logger:info_msg(Format)).
-define(LOG_INFO(Format, Args), error_logger:info_msg(Format, Args)).
-define(LOG_WARN(Format, Args), error_logger:warning_msg(Format, Args)).
+-define(CLIENT_CAPABILITIES, [{<<"publisher_confirms">>, bool, true},
+ {<<"exchange_exchange_bindings">>, bool, true},
+ {<<"basic.nack">>, bool, true},
+ {<<"consumer_cancel_notify">>, bool, true}]).
View
91 src/amqp_channel.erl
@@ -25,15 +25,14 @@
-behaviour(gen_server).
--export([start_link/3, connection_closing/2, open/1]).
+-export([start_link/3, connection_closing/3, open/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
-export([call/2, call/3, cast/2, cast/3]).
-export([subscribe/3]).
-export([close/1, close/3]).
--export([register_return_handler/2]).
--export([register_flow_handler/2]).
--export([register_confirm_handler/2]).
+-export([register_return_handler/2, register_flow_handler/2,
+ register_confirm_handler/2]).
-export([next_publish_seqno/1]).
-export([register_default_consumer/2]).
@@ -46,7 +45,8 @@
rpc_requests = queue:new(),
anon_sub_requests = queue:new(),
tagged_sub_requests = dict:new(),
- closing = false,
+ closing = false, %% false | just_channel |
+ %% {connection, Reason}
writer,
return_handler_pid = none,
confirm_handler_pid = none,
@@ -154,7 +154,7 @@ close(Channel, Code, Text) ->
%% @doc When in confirm mode, returns the sequence number of the next
%% message to be published.
next_publish_seqno(Channel) ->
- gen_server:call(Channel, next_publish_seqno).
+ gen_server:call(Channel, next_publish_seqno, infinity).
%%---------------------------------------------------------------------------
%% Consumer registration (API)
@@ -239,8 +239,8 @@ start_link(Driver, ChannelNumber, SWF) ->
gen_server:start_link(?MODULE, [self(), Driver, ChannelNumber, SWF], []).
%% @private
-connection_closing(Pid, ChannelCloseType) ->
- gen_server:cast(Pid, {connection_closing, ChannelCloseType}).
+connection_closing(Pid, ChannelCloseType, Reason) ->
+ gen_server:cast(Pid, {connection_closing, ChannelCloseType, Reason}).
%% @private
open(Pid) ->
@@ -319,14 +319,11 @@ handle_cast({method, Method, Content}, State) ->
%% beforehand. The channel must block all further RPCs,
%% flush the RPC queue (optional), and terminate
%% @private
-handle_cast({connection_closing, CloseType}, State) ->
- handle_connection_closing(CloseType, State);
+handle_cast({connection_closing, CloseType, Reason}, State) ->
+ handle_connection_closing(CloseType, Reason, State);
%% @private
-handle_cast({shutdown, {_, 200, _}}, State) ->
- {stop, normal, State};
-%% @private
-handle_cast({shutdown, Reason}, State) ->
- {stop, Reason, State}.
+handle_cast({shutdown, Shutdown}, State) ->
+ handle_shutdown(Shutdown, State).
%% Received from rabbit_channel in the direct case
%% @private
@@ -344,8 +341,12 @@ handle_info({send_command_and_notify, Q, ChPid, Method, Content}, State) ->
{noreply, State};
%% This comes from the writer or rabbit_channel
%% @private
-handle_info({channel_exit, _FrPidOrChNumber, Reason}, State) ->
+handle_info({channel_exit, _ChNumber, Reason}, State) ->
handle_channel_exit(Reason, State);
+%% This comes from rabbit_channel in the direct case
+handle_info({channel_closing, ChPid}, State) ->
+ ok = rabbit_channel:ready_for_close(ChPid),
+ {noreply, State};
%% @private
handle_info(timed_out_flushing_channel, State) ->
?LOG_WARN("Channel (~p) closing: timed out flushing while "
@@ -382,16 +383,8 @@ handle_info({'DOWN', _, process, DefaultConsumer, Reason},
{noreply, State#state{default_consumer = none}}.
%% @private
-terminate(Reason, #state{rpc_requests = RpcQueue}) ->
- case queue:is_empty(RpcQueue) of
- false -> ?LOG_WARN("Channel (~p): RPC queue was not empty on "
- "terminate~n", [self()]),
- case Reason of
- normal -> exit(rpc_queue_not_empty_on_terminate);
- _ -> ok
- end;
- true -> ok
- end.
+terminate(_Reason, _State) ->
+ ok.
%% @private
code_change(_OldVsn, State, _Extra) ->
@@ -508,9 +501,11 @@ do_rpc(State = #state{rpc_requests = Q,
end;
{empty, NewQ} ->
case Closing of
- connection -> gen_server:cast(self(),
- {shutdown, connection_closing});
- _ -> ok
+ {connection, Reason} ->
+ gen_server:cast(self(),
+ {shutdown, {connection_closing, Reason}});
+ _ ->
+ ok
end,
State#state{rpc_requests = NewQ}
end.
@@ -606,6 +601,12 @@ handle_method_from_server1(
_ -> ReturnHandler ! {BasicReturn, AmqpMsg}
end,
{noreply, State};
+handle_method_from_server1(#'basic.cancel'{consumer_tag = ConsumerTag} = Death,
+ none, State) ->
+ Consumer = resolve_consumer(ConsumerTag, State),
+ Consumer ! Death,
+ NewState = unregister_consumer(ConsumerTag, State),
+ {noreply, NewState};
handle_method_from_server1(#'basic.ack'{} = BasicAck, none,
#state{confirm_handler_pid = none} = State) ->
?LOG_WARN("Channel (~p): received ~p but there is no "
@@ -634,19 +635,21 @@ handle_method_from_server1(Method, Content, State) ->
%% Other handle_* functions
%%---------------------------------------------------------------------------
-handle_connection_closing(CloseType, State = #state{rpc_requests = RpcQueue,
- closing = Closing}) ->
+handle_connection_closing(CloseType, Reason,
+ State = #state{rpc_requests = RpcQueue,
+ closing = Closing}) ->
+ NewState = State#state{closing = {connection, Reason}},
case {CloseType, Closing, queue:is_empty(RpcQueue)} of
{flush, false, false} ->
erlang:send_after(?TIMEOUT_FLUSH, self(),
timed_out_flushing_channel),
- {noreply, State#state{closing = connection}};
+ {noreply, NewState};
{flush, just_channel, false} ->
erlang:send_after(?TIMEOUT_CLOSE_OK, self(),
timed_out_waiting_close_ok),
- {noreply, State#state{closing = connection}};
+ {noreply, NewState};
_ ->
- {stop, connection_closing, State}
+ handle_shutdown({connection_closing, Reason}, NewState)
end.
handle_channel_exit(Reason, State) ->
@@ -656,14 +659,22 @@ handle_channel_exit(Reason, State) ->
?LOG_WARN("Channel (~p) closing: server sent error ~p~n",
[self(), Reason]),
{IsHard, Code, _} = ?PROTOCOL:lookup_amqp_exception(ErrorName),
- {stop, {if IsHard -> server_initiated_hard_close;
- true -> server_initiated_close
- end, Code, Expl}, State};
+ {stop, if IsHard -> {connection_closing,
+ {server_initiated_hard_close, Code, Expl}};
+ true -> {server_initiated_close, Code, Expl}
+ end, State};
%% Unexpected death of a channel infrastructure process
_ ->
{stop, {infrastructure_died, Reason}, State}
end.
+handle_shutdown({_, 200, _}, State) ->
+ {stop, normal, State};
+handle_shutdown({connection_closing, normal}, State) ->
+ {stop, normal, State};
+handle_shutdown(Reason, State) ->
+ {stop, Reason, State}.
+
%%---------------------------------------------------------------------------
%% Internal plumbing
%%---------------------------------------------------------------------------
@@ -719,7 +730,7 @@ build_content(#amqp_msg{props = Props, payload = Payload}) ->
check_block(_Method, _AmqpMsg, #state{closing = just_channel}) ->
closing;
-check_block(_Method, _AmqpMsg, #state{closing = connection}) ->
+check_block(_Method, _AmqpMsg, #state{closing = {connection, _}}) ->
closing;
check_block(_Method, none, #state{}) ->
ok;
@@ -748,9 +759,9 @@ is_connection_method(Method) ->
server_misbehaved(#amqp_error{} = AmqpError, State = #state{number = Number}) ->
case rabbit_binary_generator:map_exception(Number, AmqpError, ?PROTOCOL) of
- {true, _, _} ->
+ {0, _} ->
{stop, {server_misbehaved, AmqpError}, State};
- {false, _, Close} ->
+ {_, Close} ->
?LOG_WARN("Channel (~p) flushing and closing due to soft "
"error caused by the server ~p~n", [self(), AmqpError]),
Self = self(),
View
6 src/amqp_channel_sup.erl
@@ -43,11 +43,13 @@ start_link(Type, InfraArgs, ChNumber) ->
%% Internal plumbing
%%---------------------------------------------------------------------------
-start_writer_fun(_Sup, direct, [Node, User, VHost, Collector], ChNumber) ->
+start_writer_fun(_Sup, direct, [ConnectionPid, Node, User, VHost, Collector],
+ ChNumber) ->
fun () ->
{ok, RabbitCh} =
rpc:call(Node, rabbit_direct, start_channel,
- [ChNumber, self(), User, VHost, Collector]),
+ [ChNumber, self(), ConnectionPid, ?PROTOCOL, User,
+ VHost, ?CLIENT_CAPABILITIES, Collector]),
link(RabbitCh),
{ok, RabbitCh}
end;
View
26 src/amqp_channels_manager.erl
@@ -22,7 +22,7 @@
-behaviour(gen_server).
-export([start_link/2, open_channel/3, set_channel_max/2, is_empty/1,
- num_channels/1, pass_frame/3, signal_connection_closing/2]).
+ num_channels/1, pass_frame/3, signal_connection_closing/3]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
@@ -55,8 +55,8 @@ num_channels(ChMgr) ->
pass_frame(ChMgr, ChNumber, Frame) ->
gen_server:cast(ChMgr, {pass_frame, ChNumber, Frame}).
-signal_connection_closing(ChMgr, ChannelCloseType) ->
- gen_server:cast(ChMgr, {connection_closing, ChannelCloseType}).
+signal_connection_closing(ChMgr, ChannelCloseType, Reason) ->
+ gen_server:cast(ChMgr, {connection_closing, ChannelCloseType, Reason}).
%%---------------------------------------------------------------------------
%% gen_server callbacks
@@ -83,8 +83,8 @@ handle_cast({set_channel_max, ChannelMax}, State) ->
{noreply, State#state{channel_max = ChannelMax}};
handle_cast({pass_frame, ChNumber, Frame}, State) ->
{noreply, internal_pass_frame(ChNumber, Frame, State)};
-handle_cast({connection_closing, ChannelCloseType}, State) ->
- handle_connection_closing(ChannelCloseType, State).
+handle_cast({connection_closing, ChannelCloseType, Reason}, State) ->
+ handle_connection_closing(ChannelCloseType, Reason, State).
handle_info({'DOWN', _, process, Pid, Reason}, State) ->
handle_down(Pid, Reason, State).
@@ -159,11 +159,12 @@ maybe_report_down(_Pid, {app_initiated_close, _, _}, _State) ->
ok;
maybe_report_down(_Pid, {server_initiated_close, _, _}, _State) ->
ok;
-maybe_report_down(_Pid, connection_closing, _State) ->
- ok;
-maybe_report_down(Pid, {server_initiated_hard_close, _, _} = Reason,
+maybe_report_down(Pid, {connection_closing,
+ {server_initiated_hard_close, _, _} = Reason},
#state{connection = Connection}) ->
amqp_gen_connection:hard_error_in_channel(Connection, Pid, Reason);
+maybe_report_down(_Pid, {connection_closing, _}, _State) ->
+ ok;
maybe_report_down(_Pid, {server_misbehaved, AmqpError},
#state{connection = Connection}) ->
amqp_gen_connection:server_misbehaved(Connection, AmqpError);
@@ -179,11 +180,12 @@ check_all_channels_terminated(State = #state{closing = true,
false -> ok
end.
-handle_connection_closing(ChannelCloseType,
+handle_connection_closing(ChannelCloseType, Reason,
State = #state{connection = Connection}) ->
case internal_is_empty(State) of
true -> amqp_gen_connection:channels_terminated(Connection);
- false -> signal_channels_connection_closing(ChannelCloseType, State)
+ false -> signal_channels_connection_closing(ChannelCloseType, Reason,
+ State)
end,
{noreply, State#state{closing = true}}.
@@ -233,7 +235,7 @@ internal_lookup_pn(Pid, #state{map_pid_num = MapPN}) ->
internal_update_npa(Number, Pid, AState, State = #state{map_num_pa = MapNPA}) ->
State#state{map_num_pa = gb_trees:update(Number, {Pid, AState}, MapNPA)}.
-signal_channels_connection_closing(ChannelCloseType,
+signal_channels_connection_closing(ChannelCloseType, Reason,
#state{map_pid_num = MapPN}) ->
- [amqp_channel:connection_closing(Pid, ChannelCloseType)
+ [amqp_channel:connection_closing(Pid, ChannelCloseType, Reason)
|| Pid <- dict:fetch_keys(MapPN)].
View
2 src/amqp_client.app.src
@@ -1,6 +1,6 @@
{application, amqp_client,
[{description, "RabbitMQ AMQP Client"},
- {vsn, "2.3.1"},
+ {vsn, "2.4.0"},
{modules, []},
{registered, [amqp_sup]},
{env, []},
View
5 src/amqp_direct_connection.erl
@@ -42,7 +42,7 @@ open_channel_args(#state{node = Node,
user = User,
vhost = VHost,
collector = Collector}) ->
- [Node, User, VHost, Collector].
+ [self(), Node, User, VHost, Collector].
do(_Method, _State) ->
ok.
@@ -71,7 +71,8 @@ connect(#amqp_params{username = Username,
password = Pass,
node = Node,
virtual_host = VHost}, SIF, _ChMgr, State) ->
- case rpc:call(Node, rabbit_direct, connect, [Username, Pass, VHost]) of
+ case rpc:call(Node, rabbit_direct, connect,
+ [Username, Pass, VHost, ?PROTOCOL]) of
{ok, {User, ServerProperties}} ->
{ok, Collector} = SIF(),
{ok, {ServerProperties, 0, State#state{node = Node,
View
9 src/amqp_gen_connection.erl
@@ -288,21 +288,22 @@ server_initiated_close(Close, State) ->
close = Close}, State).
server_misbehaved_close(AmqpError, State) ->
- {true, 0, Close} =
- rabbit_binary_generator:map_exception(0, AmqpError, ?PROTOCOL),
+ {0, Close} = rabbit_binary_generator:map_exception(0, AmqpError, ?PROTOCOL),
set_closing_state(abrupt, #closing{reason = server_misbehaved,
close = Close}, State).
set_closing_state(ChannelCloseType, NewClosing,
State = #state{channels_manager = ChMgr,
closing = CurClosing}) ->
- amqp_channels_manager:signal_connection_closing(ChMgr, ChannelCloseType),
ResClosing =
case closing_priority(NewClosing) =< closing_priority(CurClosing) of
true -> NewClosing;
false -> CurClosing
end,
- callback(closing, [ChannelCloseType, closing_to_reason(ResClosing)],
+ ClosingReason = closing_to_reason(ResClosing),
+ amqp_channels_manager:signal_connection_closing(ChMgr, ChannelCloseType,
+ ClosingReason),
+ callback(closing, [ChannelCloseType, ClosingReason],
State#state{closing = ResClosing}).
closing_priority(false) -> 99;
View
7 src/amqp_network_connection.erl
@@ -52,7 +52,7 @@ do(Method, State) ->
do2(Method, State).
do2(Method, #state{writer0 = Writer}) ->
- %% Catching because it expects the {channel_exit, _, _} message on error
+ %% Catching because it expects the {channel_exit, _} message on error
catch rabbit_writer:send_command_sync(Writer, Method).
handle_message(timeout_waiting_for_close_ok,
@@ -68,7 +68,7 @@ handle_message(socket_closed, State = #state{waiting_socket_close = false}) ->
{stop, socket_closed_unexpectedly, State};
handle_message({socket_error, _} = SocketError, State) ->
{stop, SocketError, State};
-handle_message({channel_exit, _, Reason}, State) ->
+handle_message({channel_exit, Reason}, State) ->
{stop, {channel0_died, Reason}, State};
handle_message(heartbeat_timeout, State) ->
{stop, heartbeat_timeout, State}.
@@ -230,7 +230,8 @@ client_properties(UserProperties) ->
<<"Copyright (c) 2007-2011 VMware, Inc.">>},
{<<"information">>, longstr,
<<"Licensed under the MPL. "
- "See http://www.rabbitmq.com/">>}],
+ "See http://www.rabbitmq.com/">>},
+ {<<"capabilities">>, table, ?CLIENT_CAPABILITIES}],
lists:foldl(fun({K, _, _} = Tuple, Acc) ->
lists:keystore(K, 1, Acc, Tuple)
end, Default, UserProperties).

0 comments on commit 2894668

Please sign in to comment.
Something went wrong with that request. Please try again.