Permalink
Browse files

Updated to RabbitMQ v2.5.0

  • Loading branch information...
jbrisbin committed Jun 20, 2011
1 parent 2894668 commit c62f4df9f2d5aa2d5218123da32caefac2ccac41
View
@@ -26,19 +26,33 @@
-record(amqp_msg, {props = #'P_basic'{}, payload = <<>>}).
--record(amqp_params, {username = <<"guest">>,
- password = <<"guest">>,
- virtual_host = <<"/">>,
- host = "localhost",
- port = ?PROTOCOL_PORT,
- node = node(),
- channel_max = 0,
- frame_max = 0,
- heartbeat = 0,
- ssl_options = none,
- auth_mechanisms = [fun amqp_auth_mechanisms:plain/3,
- fun amqp_auth_mechanisms:amqplain/3],
- client_properties = []}).
+-record(amqp_params_network, {username = <<"guest">>,
+ password = <<"guest">>,
+ virtual_host = <<"/">>,
+ host = "localhost",
+ port = ?PROTOCOL_PORT,
+ channel_max = 0,
+ frame_max = 0,
+ heartbeat = 0,
+ ssl_options = none,
+ auth_mechanisms =
+ [fun amqp_auth_mechanisms:plain/3,
+ fun amqp_auth_mechanisms:amqplain/3],
+ client_properties = []}).
+
+-record(amqp_params_direct, {username = <<"guest">>,
+ virtual_host = <<"/">>,
+ node = node(),
+ adapter_info = none,
+ client_properties = []}).
+
+-record(adapter_info, {address = unknown,
+ port = unknown,
+ peer_address = unknown,
+ peer_port = unknown,
+ name = unknown,
+ protocol = unknown,
+ additional_info = []}).
-define(LOG_DEBUG(Format), error_logger:info_msg(Format)).
-define(LOG_INFO(Format, Args), error_logger:info_msg(Format, Args)).
View
@@ -1,3 +1,3 @@
{deps, [
- {rabbit_common, ".*", {git, "git://github.com/jbrisbin/rabbit_common.git", "HEAD"}}
+ {rabbit_common, ".*", {git, "git://github.com/jbrisbin/rabbit_common.git", {tag, "rabbitmq_2.5.0"}}}
]}.
@@ -24,14 +24,14 @@
plain(none, _, init) ->
{<<"PLAIN">>, []};
-plain(none, #amqp_params{username = Username,
- password = Password}, _State) ->
+plain(none, #amqp_params_network{username = Username,
+ password = Password}, _State) ->
{<<0, Username/binary, 0, Password/binary>>, _State}.
amqplain(none, _, init) ->
{<<"AMQPLAIN">>, []};
-amqplain(none, #amqp_params{username = Username,
- password = Password}, _State) ->
+amqplain(none, #amqp_params_network{username = Username,
+ password = Password}, _State) ->
LoginTable = [{<<"LOGIN">>, longstr, Username},
{<<"PASSWORD">>, longstr, Password}],
{rabbit_binary_generator:generate_table(LoginTable), _State}.
View
@@ -14,18 +14,59 @@
%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
+%% @type close_reason(Type) = {shutdown, amqp_reason(Type)}.
+%% @type amqp_reason(Type) = {Type, Code, Text}
+%% Code = non_neg_integer()
+%% Text = binary().
%% @doc This module encapsulates the client's view of an AMQP
%% channel. Each server side channel is represented by an amqp_channel
%% process on the client side. Channel processes are created using the
%% {@link amqp_connection} module. Channel processes are supervised
-%% under amqp_client's supervision tree.
+%% under amqp_client's supervision tree.<br/>
+%% <br/>
+%% In case of a failure or an AMQP error, the channel process exits with a
+%% meaningful exit reason:<br/>
+%% <br/>
+%% <table>
+%% <tr>
+%% <td><strong>Cause</strong></td>
+%% <td><strong>Exit reason</strong></td>
+%% </tr>
+%% <tr>
+%% <td>Any reason, where Code would have been 200 otherwise</td>
+%% <td>```normal'''</td>
+%% </tr>
+%% <tr>
+%% <td>User application calls amqp_channel:close/3</td>
+%% <td>```close_reason(app_initiated_close)'''</td>
+%% </tr>
+%% <tr>
+%% <td>Server closes channel (soft error)</td>
+%% <td>```close_reason(server_initiated_close)'''</td>
+%% </tr>
+%% <tr>
+%% <td>Server misbehaved (did not follow protocol)</td>
+%% <td>```close_reason(server_misbehaved)'''</td>
+%% </tr>
+%% <tr>
+%% <td>Connection is closing (causing all channels to cleanup and
+%% close)</td>
+%% <td>```{shutdown, {connection_closing, amqp_reason(atom())}}'''</td>
+%% </tr>
+%% <tr>
+%% <td>Other error</td>
+%% <td>(various error reasons, causing more detailed logging)</td>
+%% </tr>
+%% </table>
+%% <br/>
+%% See type definitions below.
-module(amqp_channel).
-include("amqp_client.hrl").
-behaviour(gen_server).
--export([start_link/3, connection_closing/3, open/1]).
+-export([start_link/4, connection_closing/3, open/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
-export([call/2, call/3, cast/2, cast/3]).
@@ -40,12 +81,13 @@
-define(TIMEOUT_CLOSE_OK, 3000).
-record(state, {number,
- sup,
+ connection,
driver,
rpc_requests = queue:new(),
anon_sub_requests = queue:new(),
tagged_sub_requests = dict:new(),
- closing = false, %% false | just_channel |
+ closing = false, %% false |
+ %% {just_channel, Reason} |
%% {connection, Reason}
writer,
return_handler_pid = none,
@@ -130,21 +172,21 @@ cast(Channel, Method) ->
cast(Channel, Method, Content) ->
gen_server:cast(Channel, {cast, Method, Content}).
-%% @spec (Channel) -> ok
+%% @spec (Channel) -> ok | closing
%% where
%% Channel = pid()
%% @doc Closes the channel, invokes
%% close(Channel, 200, &lt;&lt;"Goodbye"&gt;&gt;).
close(Channel) ->
close(Channel, 200, <<"Goodbye">>).
-%% @spec (Channel, Code, Text) -> ok
+%% @spec (Channel, Code, Text) -> ok | closing
%% where
%% Channel = pid()
%% Code = integer()
%% Text = binary()
%% @doc Closes the channel, allowing the caller to supply a reply code and
-%% text.
+%% text. If the channel is already closing, the atom 'closing' is returned.
close(Channel, Code, Text) ->
gen_server:call(Channel, {close, Code, Text}, infinity).
@@ -235,8 +277,9 @@ register_default_consumer(Channel, Consumer) ->
%%---------------------------------------------------------------------------
%% @private
-start_link(Driver, ChannelNumber, SWF) ->
- gen_server:start_link(?MODULE, [self(), Driver, ChannelNumber, SWF], []).
+start_link(Driver, Connection, ChannelNumber, SWF) ->
+ gen_server:start_link(?MODULE,
+ [Driver, Connection, ChannelNumber, SWF], []).
%% @private
connection_closing(Pid, ChannelCloseType, Reason) ->
@@ -251,8 +294,8 @@ open(Pid) ->
%%---------------------------------------------------------------------------
%% @private
-init([Sup, Driver, ChannelNumber, SWF]) ->
- {ok, #state{sup = Sup,
+init([Driver, Connection, ChannelNumber, SWF]) ->
+ {ok, #state{connection = Connection,
driver = Driver,
number = ChannelNumber,
start_writer_fun = SWF}}.
@@ -478,8 +521,9 @@ rpc_top_half(Method, Content, From,
rpc_bottom_half(Reply, State = #state{rpc_requests = RequestQueue}) ->
{{value, {From, _Method, _Content}}, RequestQueue1} =
queue:out(RequestQueue),
- case From of none -> ok;
- _ -> gen_server:reply(From, Reply)
+ case From of
+ none -> ok;
+ _ -> gen_server:reply(From, Reply)
end,
do_rpc(State#state{rpc_requests = RequestQueue1}).
@@ -493,7 +537,8 @@ do_rpc(State = #state{rpc_requests = Q,
true -> State1;
false -> case {From, DoRet} of
{none, _} -> ok;
- {_, ok} -> gen_server:reply(From, ok)
+ {_, ok} -> gen_server:reply(From, ok);
+ _ -> ok
%% Do not reply if error in do. Expecting
%% {channel_exit, ...}
end,
@@ -510,10 +555,11 @@ do_rpc(State = #state{rpc_requests = Q,
State#state{rpc_requests = NewQ}
end.
-pre_do(#'channel.open'{}, _Content, State) ->
+pre_do(#'channel.open'{}, none, State) ->
start_writer(State);
-pre_do(#'channel.close'{}, _Content, State) ->
- State#state{closing = just_channel};
+pre_do(#'channel.close'{reply_code = Code, reply_text = Text}, none,
+ State) ->
+ State#state{closing = {just_channel, {app_initiated_close, Code, Text}}};
pre_do(_, _, State) ->
State.
@@ -530,10 +576,10 @@ handle_method_from_server(Method, Content, State = #state{closing = Closing}) ->
method = element(1, Method)},
State);
false -> Drop = case {Closing, Method} of
- {just_channel, #'channel.close'{}} -> false;
- {just_channel, #'channel.close_ok'{}} -> false;
- {just_channel, _} -> true;
- _ -> false
+ {{just_channel, _}, #'channel.close'{}} -> false;
+ {{just_channel, _}, #'channel.close_ok'{}} -> false;
+ {{just_channel, _}, _} -> true;
+ _ -> false
end,
if Drop -> ?LOG_INFO("Channel (~p): dropping method ~p from "
"server because channel is closing~n",
@@ -546,12 +592,32 @@ handle_method_from_server(Method, Content, State = #state{closing = Closing}) ->
handle_method_from_server1(#'channel.open_ok'{}, none, State) ->
{noreply, rpc_bottom_half(ok, State)};
+handle_method_from_server1(#'channel.close'{reply_code = Code,
+ reply_text = Text},
+ none,
+ State = #state{closing = {just_channel, _}}) ->
+ %% Both client and server sent close at the same time. Don't shutdown yet,
+ %% wait for close_ok.
+ do(#'channel.close_ok'{}, none, State),
+ erlang:send_after(?TIMEOUT_CLOSE_OK, self(), timed_out_waiting_close_ok),
+ {noreply,
+ State#state{
+ closing = {just_channel, {server_initiated_close, Code, Text}}}};
handle_method_from_server1(#'channel.close'{reply_code = Code,
reply_text = Text}, none, State) ->
do(#'channel.close_ok'{}, none, State),
- {stop, {server_initiated_close, Code, Text}, State};
-handle_method_from_server1(#'channel.close_ok'{}, none, State) ->
- {stop, normal, rpc_bottom_half(ok, State)};
+ handle_shutdown({server_initiated_close, Code, Text}, State);
+handle_method_from_server1(#'channel.close_ok'{}, none,
+ State = #state{closing = Closing}) ->
+ case Closing of
+ {just_channel, {app_initiated_close, _, _} = Reason} ->
+ handle_shutdown(Reason, rpc_bottom_half(ok, State));
+ {just_channel, {server_initiated_close, _, _} = Reason} ->
+ handle_shutdown(Reason,
+ rpc_bottom_half(closing, State));
+ {connection, Reason} ->
+ handle_shutdown({connection_closing, Reason}, State)
+ end;
handle_method_from_server1(
#'basic.consume_ok'{consumer_tag = ConsumerTag} = ConsumeOk,
none, State = #state{tagged_sub_requests = Tagged,
@@ -644,36 +710,42 @@ handle_connection_closing(CloseType, Reason,
erlang:send_after(?TIMEOUT_FLUSH, self(),
timed_out_flushing_channel),
{noreply, NewState};
- {flush, just_channel, false} ->
+ {flush, {just_channel, _}, false} ->
erlang:send_after(?TIMEOUT_CLOSE_OK, self(),
timed_out_waiting_close_ok),
{noreply, NewState};
_ ->
handle_shutdown({connection_closing, Reason}, NewState)
end.
-handle_channel_exit(Reason, State) ->
+handle_channel_exit(Reason, State = #state{connection = Connection}) ->
case Reason of
%% Sent by rabbit_channel in the direct case
#amqp_error{name = ErrorName, explanation = Expl} ->
?LOG_WARN("Channel (~p) closing: server sent error ~p~n",
[self(), Reason]),
{IsHard, Code, _} = ?PROTOCOL:lookup_amqp_exception(ErrorName),
- {stop, if IsHard -> {connection_closing,
- {server_initiated_hard_close, Code, Expl}};
- true -> {server_initiated_close, Code, Expl}
- end, State};
+ ReportedReason = {server_initiated_close, Code, Expl},
+ handle_shutdown(
+ if IsHard ->
+ amqp_gen_connection:hard_error_in_channel(
+ Connection, self(), ReportedReason),
+ {connection_closing, ReportedReason};
+ true -> ReportedReason
+ end, State);
%% Unexpected death of a channel infrastructure process
_ ->
{stop, {infrastructure_died, Reason}, State}
end.
handle_shutdown({_, 200, _}, State) ->
{stop, normal, State};
+handle_shutdown({connection_closing, {_, 200, _}}, State) ->
+ {stop, normal, State};
handle_shutdown({connection_closing, normal}, State) ->
{stop, normal, State};
handle_shutdown(Reason, State) ->
- {stop, Reason, State}.
+ {stop, {shutdown, Reason}, State}.
%%---------------------------------------------------------------------------
%% Internal plumbing
@@ -728,7 +800,7 @@ build_content(none) ->
build_content(#amqp_msg{props = Props, payload = Payload}) ->
rabbit_basic:build_content(Props, Payload).
-check_block(_Method, _AmqpMsg, #state{closing = just_channel}) ->
+check_block(_Method, _AmqpMsg, #state{closing = {just_channel, _}}) ->
closing;
check_block(_Method, _AmqpMsg, #state{closing = {connection, _}}) ->
closing;
@@ -760,7 +832,7 @@ is_connection_method(Method) ->
server_misbehaved(#amqp_error{} = AmqpError, State = #state{number = Number}) ->
case rabbit_binary_generator:map_exception(Number, AmqpError, ?PROTOCOL) of
{0, _} ->
- {stop, {server_misbehaved, AmqpError}, State};
+ handle_shutdown({server_misbehaved, AmqpError}, State);
{_, Close} ->
?LOG_WARN("Channel (~p) flushing and closing due to soft "
"error caused by the server ~p~n", [self(), AmqpError]),
View
@@ -21,18 +21,18 @@
-behaviour(supervisor2).
--export([start_link/3]).
+-export([start_link/4]).
-export([init/1]).
%%---------------------------------------------------------------------------
%% Interface
%%---------------------------------------------------------------------------
-start_link(Type, InfraArgs, ChNumber) ->
+start_link(Type, Connection, InfraArgs, ChNumber) ->
{ok, Sup} = supervisor2:start_link(?MODULE, []),
{ok, ChPid} = supervisor2:start_child(
Sup, {channel, {amqp_channel, start_link,
- [Type, ChNumber,
+ [Type, Connection, ChNumber,
start_writer_fun(Sup, Type, InfraArgs,
ChNumber)]},
intrinsic, brutal_kill, worker, [amqp_channel]}),
Oops, something went wrong.

0 comments on commit c62f4df

Please sign in to comment.