Skip to content
Browse files

Initial commit

  • Loading branch information...
0 parents commit c8f08c20652e4a9f4250bc0f6d97ddcc5c7f6e0b @jbrisbin committed Jan 19, 2011
18 README.md
@@ -0,0 +1,18 @@
+= Rebar-friendly fork of Erlang AMQP client
+
+This is a fork of the [official RabbitMQ/AMQP Erlang client](https://github.com/rabbitmq/rabbitmq-erlang-client).
+
+It's meant to be included in your rebar projects in your rebar.config file:
+
+ {deps, [
+ {rabbit_common, ".*", {git, "git://github.com/jbrisbin/rabbit_common.git", "HEAD"}},
+ {amqp_client, ".*", {git, "git://github.com/jbrisbin/amqp_client.git", "HEAD"}}
+ ]}.
+
+This is simply a re-packaging of the AMQP client, which is licensed under the MPL:
+
+ This package, the RabbitMQ server is licensed under the MPL. For the
+ MPL, please see LICENSE-MPL-RabbitMQ.
+
+ If you have any questions regarding licensing, please contact us at
+ info@rabbitmq.com.
44 include/amqp_client.hrl
@@ -0,0 +1,44 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit_framing.hrl").
+
+-define(PROTOCOL_VERSION_MAJOR, 0).
+-define(PROTOCOL_VERSION_MINOR, 9).
+-define(PROTOCOL_HEADER, <<"AMQP", 0, 0, 9, 1>>).
+-define(PROTOCOL, rabbit_framing_amqp_0_9_1).
+
+-define(MAX_CHANNEL_NUMBER, 65535).
+
+-record(amqp_msg, {props = #'P_basic'{}, payload = <<>>}).
+
+-record(amqp_params, {username = <<"guest">>,
+ password = <<"guest">>,
+ virtual_host = <<"/">>,
+ host = "localhost",
+ port = ?PROTOCOL_PORT,
+ channel_max = 0,
+ frame_max = 0,
+ heartbeat = 0,
+ ssl_options = none,
+ auth_mechanisms = [fun amqp_auth_mechanisms:plain/3,
+ fun amqp_auth_mechanisms:amqplain/3],
+ client_properties = []}).
+
+-define(LOG_DEBUG(Format), error_logger:info_msg(Format)).
+-define(LOG_INFO(Format, Args), error_logger:info_msg(Format, Args)).
+-define(LOG_WARN(Format, Args), error_logger:warning_msg(Format, Args)).
3 rebar.config
@@ -0,0 +1,3 @@
+{deps, [
+ {rabbit_common, ".*", {git, "git://github.com/jbrisbin/rabbit_common.git", "HEAD"}},
+]}.
42 src/amqp_auth_mechanisms.erl
@@ -0,0 +1,42 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+-module(amqp_auth_mechanisms).
+
+-include("amqp_client.hrl").
+
+-export([plain/3, amqplain/3, external/3]).
+
+%%---------------------------------------------------------------------------
+
+plain(none, _, init) ->
+ {<<"PLAIN">>, []};
+plain(none, #amqp_params{username = Username,
+ password = Password}, _State) ->
+ {<<0, Username/binary, 0, Password/binary>>, _State}.
+
+amqplain(none, _, init) ->
+ {<<"AMQPLAIN">>, []};
+amqplain(none, #amqp_params{username = Username,
+ password = Password}, _State) ->
+ LoginTable = [{<<"LOGIN">>, longstr, Username},
+ {<<"PASSWORD">>, longstr, Password}],
+ {rabbit_binary_generator:generate_table(LoginTable), _State}.
+
+external(none, _, init) ->
+ {<<"EXTERNAL">>, []};
+external(none, _, _State) ->
+ {<<"">>, _State}.
738 src/amqp_channel.erl
@@ -0,0 +1,738 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+%% @doc This module encapsulates the client's view of an AMQP
+%% channel. Each server side channel is represented by an amqp_channel
+%% process on the client side. Channel processes are created using the
+%% {@link amqp_connection} module. Channel processes are supervised
+%% under amqp_client's supervision tree.
+-module(amqp_channel).
+
+-include("amqp_client.hrl").
+
+-behaviour(gen_server).
+
+-export([start_link/3, connection_closing/2, open/1]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2]).
+-export([call/2, call/3, cast/2, cast/3]).
+-export([subscribe/3]).
+-export([close/1, close/3]).
+-export([register_return_handler/2]).
+-export([register_flow_handler/2]).
+-export([register_ack_handler/2]).
+-export([next_publish_seqno/1]).
+-export([register_default_consumer/2]).
+
+-define(TIMEOUT_FLUSH, 60000).
+-define(TIMEOUT_CLOSE_OK, 3000).
+
+-record(state, {number,
+ sup,
+ driver,
+ rpc_requests = queue:new(),
+ anon_sub_requests = queue:new(),
+ tagged_sub_requests = dict:new(),
+ closing = false,
+ writer,
+ return_handler_pid = none,
+ ack_handler_pid = none,
+ next_pub_seqno = 0,
+ flow_active = true,
+ flow_handler_pid = none,
+ consumers = dict:new(),
+ default_consumer = none,
+ start_writer_fun
+ }).
+
+%%---------------------------------------------------------------------------
+%% Type Definitions
+%%---------------------------------------------------------------------------
+
+%% @type amqp_method().
+%% This abstract datatype represents the set of methods that comprise
+%% the AMQP execution model. As indicated in the overview, the
+%% attributes of each method in the execution model are described in
+%% the protocol documentation. The Erlang record definitions are
+%% autogenerated from a parseable version of the specification. Most
+%% fields in the generated records have sensible default values that
+%% you need not worry in the case of a simple usage of the client
+%% library.
+
+%% @type amqp_msg() = #amqp_msg{}.
+%% This is the content encapsulated in content-bearing AMQP methods. It
+%% contains the following fields:
+%% <ul>
+%% <li>props :: class_property() - A class property record, defaults to
+%% #'P_basic'{}</li>
+%% <li>payload :: binary() - The arbitrary data payload</li>
+%% </ul>
+
+%%---------------------------------------------------------------------------
+%% AMQP Channel API methods
+%%---------------------------------------------------------------------------
+
+%% @spec (Channel, Method) -> Result
+%% @doc This is equivalent to amqp_channel:call(Channel, Method, none).
+call(Channel, Method) ->
+ gen_server:call(Channel, {call, Method, none}, infinity).
+
+%% @spec (Channel, Method, Content) -> Result
+%% where
+%% Channel = pid()
+%% Method = amqp_method()
+%% Content = amqp_msg() | none
+%% Result = amqp_method() | ok | blocked | closing
+%% @doc This sends an AMQP method on the channel.
+%% For content bearing methods, Content has to be an amqp_msg(), whereas
+%% for non-content bearing methods, it needs to be the atom 'none'.<br/>
+%% In the case of synchronous methods, this function blocks until the
+%% corresponding reply comes back from the server and returns it.
+%% In the case of asynchronous methods, the function blocks until the method
+%% gets sent on the wire and returns the atom 'ok' on success.<br/>
+%% This will return the atom 'blocked' if the server has
+%% throttled the client for flow control reasons. This will return the
+%% atom 'closing' if the channel is in the process of shutting down.<br/>
+%% Note that for asynchronous methods, the synchronicity implied by
+%% 'call' only means that the client has transmitted the method to
+%% the broker. It does not necessarily imply that the broker has
+%% accepted responsibility for the message.
+call(Channel, Method, Content) ->
+ gen_server:call(Channel, {call, Method, Content}, infinity).
+
+%% @spec (Channel, Method) -> ok
+%% @doc This is equivalent to amqp_channel:cast(Channel, Method, none).
+cast(Channel, Method) ->
+ gen_server:cast(Channel, {cast, Method, none}).
+
+%% @spec (Channel, Method, Content) -> ok
+%% where
+%% Channel = pid()
+%% Method = amqp_method()
+%% Content = amqp_msg() | none
+%% @doc This function is the same as {@link call/3}, except that it returns
+%% immediately with the atom 'ok', without blocking the caller process.
+%% This function is not recommended with synchronous methods, since there is no
+%% way to verify that the server has received the method.
+cast(Channel, Method, Content) ->
+ gen_server:cast(Channel, {cast, Method, Content}).
+
+%% @spec (Channel) -> ok
+%% where
+%% Channel = pid()
+%% @doc Closes the channel, invokes
+%% close(Channel, 200, &lt;&lt;"Goodbye"&gt;&gt;).
+close(Channel) ->
+ close(Channel, 200, <<"Goodbye">>).
+
+%% @spec (Channel, Code, Text) -> ok
+%% where
+%% Channel = pid()
+%% Code = integer()
+%% Text = binary()
+%% @doc Closes the channel, allowing the caller to supply a reply code and
+%% text.
+close(Channel, Code, Text) ->
+ gen_server:call(Channel, {close, Code, Text}, infinity).
+
+%% @spec (Channel) -> integer()
+%% where
+%% Channel = pid()
+%% @doc When in confirm mode, returns the sequence number of the next
+%% message to be published.
+next_publish_seqno(Channel) ->
+ gen_server:call(Channel, next_publish_seqno).
+
+%%---------------------------------------------------------------------------
+%% Consumer registration (API)
+%%---------------------------------------------------------------------------
+
+%% @type consume() = #'basic.consume'{}.
+%% The AMQP method that is used to subscribe a consumer to a queue.
+%% @spec (Channel, consume(), Consumer) -> amqp_method()
+%% where
+%% Channel = pid()
+%% Consumer = pid()
+%% @doc Creates a subscription to a queue. This subscribes a consumer pid to
+%% the queue defined in the #'basic.consume'{} method record. Note that
+%% both the process invoking this method and the supplied consumer process
+%% receive an acknowledgement of the subscription. The calling process will
+%% receive the acknowledgement as the return value of this function, whereas
+%% the consumer process will receive the notification asynchronously.
+subscribe(Channel, BasicConsume = #'basic.consume'{}, Consumer) ->
+ gen_server:call(Channel, {subscribe, BasicConsume, Consumer}, infinity).
+
+%% @spec (Channel, ReturnHandler) -> ok
+%% where
+%% Channel = pid()
+%% ReturnHandler = pid()
+%% @doc This registers a handler to deal with returned messages. The
+%% registered process will receive #basic.return{} records.
+register_return_handler(Channel, ReturnHandler) ->
+ gen_server:cast(Channel, {register_return_handler, ReturnHandler} ).
+
+%% @spec (Channel, AckHandler) -> ok
+%% where
+%% Channel = pid()
+%% AckHandler = pid()
+%% @doc This registers a handler to deal with ack'd messages. The
+%% registered process will receive #basic.ack{} commands.
+register_ack_handler(Channel, AckHandler) ->
+ gen_server:cast(Channel, {register_ack_handler, AckHandler} ).
+
+%% @spec (Channel, FlowHandler) -> ok
+%% where
+%% Channel = pid()
+%% FlowHandler = pid()
+%% @doc This registers a handler to deal with channel flow notifications.
+%% The registered process will receive #channel.flow{} records.
+register_flow_handler(Channel, FlowHandler) ->
+ gen_server:cast(Channel, {register_flow_handler, FlowHandler} ).
+
+%% @spec (Channel, Consumer) -> ok
+%% where
+%% Channel = pid()
+%% Consumer = pid()
+%% @doc Set the current default consumer.
+%% Under certain circumstances it is possible for a channel to receive a
+%% message delivery which does not match any consumer which is currently
+%% set up via basic.consume. This will occur after the following sequence
+%% of events:<br/>
+%% <br/>
+%% basic.consume with explicit acks<br/>
+%% %% some deliveries take place but are not acked<br/>
+%% basic.cancel<br/>
+%% basic.recover{requeue = false}<br/>
+%% <br/>
+%% Since requeue is specified to be false in the basic.recover, the spec
+%% states that the message must be redelivered to "the original recipient"
+%% - i.e. the same channel / consumer-tag. But the consumer is no longer
+%% active.<br/>
+%% In these circumstances, you can register a default consumer to handle
+%% such deliveries. If no default consumer is registered then the channel
+%% will exit on receiving such a delivery.<br/>
+%% Most people will not need to use this.
+register_default_consumer(Channel, Consumer) ->
+ gen_server:cast(Channel, {register_default_consumer, Consumer}).
+
+%%---------------------------------------------------------------------------
+%% Internal interface
+%%---------------------------------------------------------------------------
+
+%% @private
+start_link(Driver, ChannelNumber, SWF) ->
+ gen_server:start_link(?MODULE, [self(), Driver, ChannelNumber, SWF], []).
+
+%% @private
+connection_closing(Pid, ChannelCloseType) ->
+ gen_server:cast(Pid, {connection_closing, ChannelCloseType}).
+
+%% @private
+open(Pid) ->
+ gen_server:call(Pid, open, infinity).
+
+%%---------------------------------------------------------------------------
+%% gen_server callbacks
+%%---------------------------------------------------------------------------
+
+%% @private
+init([Sup, Driver, ChannelNumber, SWF]) ->
+ {ok, #state{sup = Sup,
+ driver = Driver,
+ number = ChannelNumber,
+ start_writer_fun = SWF}}.
+
+%% @private
+handle_call(open, From, State) ->
+ {noreply, rpc_top_half(#'channel.open'{}, none, From, State)};
+%% @private
+handle_call({close, Code, Text}, From, State) ->
+ handle_close(Code, Text, From, State);
+%% @private
+handle_call({call, Method, AmqpMsg}, From, State) ->
+ handle_method_to_server(Method, AmqpMsg, From, State);
+%% @private
+handle_call({subscribe, Method, Consumer}, From, State) ->
+ handle_subscribe(Method, Consumer, From, State);
+%% Handles the delivery of messages from a direct channel
+%% @private
+handle_call({send_command_sync, Method, Content}, From, State) ->
+ Ret = handle_method_from_server(Method, Content, State),
+ gen_server:reply(From, ok),
+ Ret;
+%% Handles the delivery of messages from a direct channel
+%% @private
+handle_call({send_command_sync, Method}, From, State) ->
+ Ret = handle_method_from_server(Method, none, State),
+ gen_server:reply(From, ok),
+ Ret;
+%% When in confirm mode, returns the sequence number of the next
+%% message to be published.
+%% @private
+handle_call(next_publish_seqno, _From,
+ State = #state{next_pub_seqno = SeqNo}) ->
+ {reply, SeqNo, State}.
+
+%% @private
+handle_cast({cast, Method, AmqpMsg}, State) ->
+ handle_method_to_server(Method, AmqpMsg, none, State);
+%% Registers a handler to process return messages
+%% @private
+handle_cast({register_return_handler, ReturnHandler}, State) ->
+ erlang:monitor(process, ReturnHandler),
+ {noreply, State#state{return_handler_pid = ReturnHandler}};
+%% Registers a handler to process ack messages
+%% @private
+handle_cast({register_ack_handler, AckHandler}, State) ->
+ erlang:monitor(process, AckHandler),
+ {noreply, State#state{ack_handler_pid = AckHandler}};
+%% Registers a handler to process flow control messages
+%% @private
+handle_cast({register_flow_handler, FlowHandler}, State) ->
+ erlang:monitor(process, FlowHandler),
+ {noreply, State#state{flow_handler_pid = FlowHandler}};
+%% Registers a handler to process unexpected deliveries
+%% @private
+handle_cast({register_default_consumer, Consumer}, State) ->
+ erlang:monitor(process, Consumer),
+ {noreply, State#state{default_consumer = Consumer}};
+%% Received from channels manager
+%% @private
+handle_cast({method, Method, Content}, State) ->
+ handle_method_from_server(Method, Content, State);
+%% Handles the situation when the connection closes without closing the channel
+%% beforehand. The channel must block all further RPCs,
+%% flush the RPC queue (optional), and terminate
+%% @private
+handle_cast({connection_closing, CloseType}, State) ->
+ handle_connection_closing(CloseType, State);
+%% @private
+handle_cast({shutdown, {_, 200, _}}, State) ->
+ {stop, normal, State};
+%% @private
+handle_cast({shutdown, Reason}, State) ->
+ {stop, Reason, State}.
+
+%% Received from rabbit_channel in the direct case
+%% @private
+handle_info({send_command, Method}, State) ->
+ handle_method_from_server(Method, none, State);
+%% Received from rabbit_channel in the direct case
+%% @private
+handle_info({send_command, Method, Content}, State) ->
+ handle_method_from_server(Method, Content, State);
+%% Received from rabbit_channel in the direct case
+%% @private
+handle_info({send_command_and_notify, Q, ChPid, Method, Content}, State) ->
+ handle_method_from_server(Method, Content, State),
+ rabbit_amqqueue:notify_sent(Q, ChPid),
+ {noreply, State};
+%% This comes from the writer or rabbit_channel
+%% @private
+handle_info({channel_exit, _FrPidOrChNumber, Reason}, State) ->
+ handle_channel_exit(Reason, State);
+%% @private
+handle_info(timed_out_flushing_channel, State) ->
+ ?LOG_WARN("Channel (~p) closing: timed out flushing while "
+ "connection closing~n", [self()]),
+ {stop, timed_out_flushing_channel, State};
+%% @private
+handle_info(timed_out_waiting_close_ok, State) ->
+ ?LOG_WARN("Channel (~p) closing: timed out waiting for "
+ "channel.close_ok while connection closing~n", [self()]),
+ {stop, timed_out_waiting_close_ok, State};
+%% @private
+handle_info({'DOWN', _, process, ReturnHandler, Reason},
+ State = #state{return_handler_pid = ReturnHandler}) ->
+ ?LOG_WARN("Channel (~p): Unregistering return handler ~p because it died. "
+ "Reason: ~p~n", [self(), ReturnHandler, Reason]),
+ {noreply, State#state{return_handler_pid = none}};
+%% @private
+handle_info({'DOWN', _, process, AckHandler, Reason},
+ State = #state{ack_handler_pid = AckHandler}) ->
+ ?LOG_WARN("Channel (~p): Unregistering ack handler ~p because it died. "
+ "Reason: ~p~n", [self(), AckHandler, Reason]),
+ {noreply, State#state{ack_handler_pid = none}};
+%% @private
+handle_info({'DOWN', _, process, FlowHandler, Reason},
+ State = #state{flow_handler_pid = FlowHandler}) ->
+ ?LOG_WARN("Channel (~p): Unregistering flow handler ~p because it died. "
+ "Reason: ~p~n", [self(), FlowHandler, Reason]),
+ {noreply, State#state{flow_handler_pid = none}};
+%% @private
+handle_info({'DOWN', _, process, DefaultConsumer, Reason},
+ State = #state{default_consumer = DefaultConsumer}) ->
+ ?LOG_WARN("Channel (~p): Unregistering default consumer ~p because it died."
+ "Reason: ~p~n", [self(), DefaultConsumer, Reason]),
+ {noreply, State#state{default_consumer = none}}.
+
+%% @private
+terminate(Reason, #state{rpc_requests = RpcQueue}) ->
+ case queue:is_empty(RpcQueue) of
+ false -> ?LOG_WARN("Channel (~p): RPC queue was not empty on "
+ "terminate~n", [self()]),
+ case Reason of
+ normal -> exit(rpc_queue_not_empty_on_terminate);
+ _ -> ok
+ end;
+ true -> ok
+ end.
+
+%% @private
+code_change(_OldVsn, State, _Extra) ->
+ State.
+
+%%---------------------------------------------------------------------------
+%% RPC mechanism
+%%---------------------------------------------------------------------------
+
+handle_method_to_server(Method, AmqpMsg, From, State) ->
+ case {check_invalid_method(Method), From,
+ check_block(Method, AmqpMsg, State)} of
+ {ok, _, ok} ->
+ State1 = case {Method, State#state.next_pub_seqno} of
+ {#'confirm.select'{}, _} ->
+ State#state{next_pub_seqno = 1};
+ {#'basic.publish'{}, 0} ->
+ State;
+ {#'basic.publish'{}, SeqNo} ->
+ State#state{next_pub_seqno = SeqNo + 1};
+ _ ->
+ State
+ end,
+ {noreply,
+ rpc_top_half(Method, build_content(AmqpMsg), From, State1)};
+ {ok, none, BlockReply} ->
+ ?LOG_WARN("Channel (~p): discarding method ~p in cast.~n"
+ "Reason: ~p~n", [self(), Method, BlockReply]),
+ {noreply, State};
+ {ok, _, BlockReply} ->
+ {reply, BlockReply, State};
+ {{_, InvalidMethodMessage}, none, _} ->
+ ?LOG_WARN("Channel (~p): ignoring cast of ~p method. " ++
+ InvalidMethodMessage ++ "~n", [self(), Method]),
+ {noreply, State};
+ {{InvalidMethodReply, _}, _, _} ->
+ {reply, {error, InvalidMethodReply}, State}
+ end.
+
+handle_close(Code, Text, From, State) ->
+ Close = #'channel.close'{reply_code = Code,
+ reply_text = Text,
+ class_id = 0,
+ method_id = 0},
+ case check_block(Close, none, State) of
+ ok -> {noreply, rpc_top_half(Close, none, From, State)};
+ BlockReply -> {reply, BlockReply, State}
+ end.
+
+handle_subscribe(#'basic.consume'{consumer_tag = Tag} = Method, Consumer,
+ From, State = #state{tagged_sub_requests = Tagged,
+ anon_sub_requests = Anon,
+ consumers = Consumers}) ->
+ case check_block(Method, none, State) of
+ ok when Tag =:= undefined orelse size(Tag) == 0 ->
+ NewMethod = Method#'basic.consume'{consumer_tag = <<"">>},
+ NewState = State#state{anon_sub_requests =
+ queue:in(Consumer, Anon)},
+ {noreply, rpc_top_half(NewMethod, none, From, NewState)};
+ ok when is_binary(Tag) ->
+ case dict:is_key(Tag, Tagged) orelse dict:is_key(Tag, Consumers) of
+ true ->
+ {reply, {error, consumer_tag_already_in_use}, State};
+ false ->
+ NewState = State#state{tagged_sub_requests =
+ dict:store(Tag, Consumer, Tagged)},
+ {noreply, rpc_top_half(Method, none, From, NewState)}
+ end;
+ BlockReply ->
+ {reply, BlockReply, State}
+ end.
+
+rpc_top_half(Method, Content, From,
+ State0 = #state{rpc_requests = RequestQueue}) ->
+ State1 = State0#state{
+ rpc_requests = queue:in({From, Method, Content}, RequestQueue)},
+ IsFirstElement = queue:is_empty(RequestQueue),
+ if IsFirstElement -> do_rpc(State1);
+ true -> State1
+ end.
+
+rpc_bottom_half(Reply, State = #state{rpc_requests = RequestQueue}) ->
+ {{value, {From, _Method, _Content}}, RequestQueue1} =
+ queue:out(RequestQueue),
+ case From of none -> ok;
+ _ -> gen_server:reply(From, Reply)
+ end,
+ do_rpc(State#state{rpc_requests = RequestQueue1}).
+
+do_rpc(State = #state{rpc_requests = Q,
+ closing = Closing}) ->
+ case queue:out(Q) of
+ {{value, {From, Method, Content}}, NewQ} ->
+ State1 = pre_do(Method, Content, State),
+ DoRet = do(Method, Content, State1),
+ case ?PROTOCOL:is_method_synchronous(Method) of
+ true -> State1;
+ false -> case {From, DoRet} of
+ {none, _} -> ok;
+ {_, ok} -> gen_server:reply(From, ok)
+ %% Do not reply if error in do. Expecting
+ %% {channel_exit, ...}
+ end,
+ do_rpc(State1#state{rpc_requests = NewQ})
+ end;
+ {empty, NewQ} ->
+ case Closing of
+ connection -> gen_server:cast(self(),
+ {shutdown, connection_closing});
+ _ -> ok
+ end,
+ State#state{rpc_requests = NewQ}
+ end.
+
+pre_do(#'channel.open'{}, _Content, State) ->
+ start_writer(State);
+pre_do(#'channel.close'{}, _Content, State) ->
+ State#state{closing = just_channel};
+pre_do(_, _, State) ->
+ State.
+
+%%---------------------------------------------------------------------------
+%% Handling of methods from the server
+%%---------------------------------------------------------------------------
+
+handle_method_from_server(Method, Content, State = #state{closing = Closing}) ->
+ case is_connection_method(Method) of
+ true -> server_misbehaved(
+ #amqp_error{name = command_invalid,
+ explanation = "connection method on "
+ "non-zero channel",
+ method = element(1, Method)},
+ State);
+ false -> Drop = case {Closing, Method} of
+ {just_channel, #'channel.close'{}} -> false;
+ {just_channel, #'channel.close_ok'{}} -> false;
+ {just_channel, _} -> true;
+ _ -> false
+ end,
+ if Drop -> ?LOG_INFO("Channel (~p): dropping method ~p from "
+ "server because channel is closing~n",
+ [self(), {Method, Content}]),
+ {noreply, State};
+ true -> handle_method_from_server1(Method,
+ amqp_msg(Content), State)
+ end
+ end.
+
+handle_method_from_server1(#'channel.open_ok'{}, none, State) ->
+ {noreply, rpc_bottom_half(ok, State)};
+handle_method_from_server1(#'channel.close'{reply_code = Code,
+ reply_text = Text}, none, State) ->
+ do(#'channel.close_ok'{}, none, State),
+ {stop, {server_initiated_close, Code, Text}, State};
+handle_method_from_server1(#'channel.close_ok'{}, none, State) ->
+ {stop, normal, rpc_bottom_half(ok, State)};
+handle_method_from_server1(
+ #'basic.consume_ok'{consumer_tag = ConsumerTag} = ConsumeOk,
+ none, State = #state{tagged_sub_requests = Tagged,
+ anon_sub_requests = Anon}) ->
+ {Consumer, State0} =
+ case dict:find(ConsumerTag, Tagged) of
+ {ok, C} ->
+ NewTagged = dict:erase(ConsumerTag, Tagged),
+ {C, State#state{tagged_sub_requests = NewTagged}};
+ error ->
+ {{value, C}, NewAnon} = queue:out(Anon),
+ {C, State#state{anon_sub_requests = NewAnon}}
+ end,
+ Consumer ! ConsumeOk,
+ State1 = register_consumer(ConsumerTag, Consumer, State0),
+ {noreply, rpc_bottom_half(ConsumeOk, State1)};
+handle_method_from_server1(
+ #'basic.cancel_ok'{consumer_tag = ConsumerTag} = CancelOk, none,
+ State) ->
+ Consumer = resolve_consumer(ConsumerTag, State),
+ Consumer ! CancelOk,
+ NewState = unregister_consumer(ConsumerTag, State),
+ {noreply, rpc_bottom_half(CancelOk, NewState)};
+handle_method_from_server1(#'channel.flow'{active = Active} = Flow, none,
+ State = #state{flow_handler_pid = FlowHandler}) ->
+ case FlowHandler of none -> ok;
+ _ -> FlowHandler ! Flow
+ end,
+ %% Putting the flow_ok in the queue so that the RPC queue can be
+ %% flushed beforehand. Methods that made it to the queue are not
+ %% blocked in any circumstance.
+ {noreply, rpc_top_half(#'channel.flow_ok'{active = Active}, none, none,
+ State#state{flow_active = Active})};
+handle_method_from_server1(
+ #'basic.deliver'{consumer_tag = ConsumerTag} = Deliver, AmqpMsg,
+ State) ->
+ Consumer = resolve_consumer(ConsumerTag, State),
+ Consumer ! {Deliver, AmqpMsg},
+ {noreply, State};
+handle_method_from_server1(
+ #'basic.return'{} = BasicReturn, AmqpMsg,
+ State = #state{return_handler_pid = ReturnHandler}) ->
+ case ReturnHandler of
+ none -> ?LOG_WARN("Channel (~p): received {~p, ~p} but there is no "
+ "return handler registered~n",
+ [self(), BasicReturn, AmqpMsg]);
+ _ -> ReturnHandler ! {BasicReturn, AmqpMsg}
+ end,
+ {noreply, State};
+handle_method_from_server1(#'basic.ack'{} = BasicAck, none,
+ #state{ack_handler_pid = none} = State) ->
+ ?LOG_WARN("Channel (~p): received ~p but there is no "
+ "ack handler registered~n", [self(), BasicAck]),
+ {noreply, State};
+handle_method_from_server1(#'basic.ack'{} = BasicAck, none,
+ #state{ack_handler_pid = AckHandler} = State) ->
+ AckHandler ! BasicAck,
+ {noreply, State};
+handle_method_from_server1(Method, none, State) ->
+ {noreply, rpc_bottom_half(Method, State)};
+handle_method_from_server1(Method, Content, State) ->
+ {noreply, rpc_bottom_half({Method, Content}, State)}.
+
+%%---------------------------------------------------------------------------
+%% Other handle_* functions
+%%---------------------------------------------------------------------------
+
+handle_connection_closing(CloseType, State = #state{rpc_requests = RpcQueue,
+ closing = Closing}) ->
+ case {CloseType, Closing, queue:is_empty(RpcQueue)} of
+ {flush, false, false} ->
+ erlang:send_after(?TIMEOUT_FLUSH, self(),
+ timed_out_flushing_channel),
+ {noreply, State#state{closing = connection}};
+ {flush, just_channel, false} ->
+ erlang:send_after(?TIMEOUT_CLOSE_OK, self(),
+ timed_out_waiting_close_ok),
+ {noreply, State#state{closing = connection}};
+ _ ->
+ {stop, connection_closing, State}
+ end.
+
+handle_channel_exit(Reason, State) ->
+ case Reason of
+ %% Sent by rabbit_channel in the direct case
+ #amqp_error{name = ErrorName, explanation = Expl} ->
+ ?LOG_WARN("Channel (~p) closing: server sent error ~p~n",
+ [self(), Reason]),
+ {IsHard, Code, _} = ?PROTOCOL:lookup_amqp_exception(ErrorName),
+ {stop, {if IsHard -> server_initiated_hard_close;
+ true -> server_initiated_close
+ end, Code, Expl}, State};
+ %% Unexpected death of a channel infrastructure process
+ _ ->
+ {stop, {infrastructure_died, Reason}, State}
+ end.
+
+%%---------------------------------------------------------------------------
+%% Internal plumbing
+%%---------------------------------------------------------------------------
+
+do(Method, Content, #state{driver = Driver, writer = W}) ->
+ %% Catching because it expects the {channel_exit, _, _} message on error
+ catch case {Driver, Content} of
+ {network, none} -> rabbit_writer:send_command_sync(W, Method);
+ {network, _} -> rabbit_writer:send_command_sync(W, Method,
+ Content);
+ {direct, none} -> rabbit_channel:do(W, Method);
+ {direct, _} -> rabbit_channel:do(W, Method, Content)
+ end.
+
+start_writer(State = #state{start_writer_fun = SWF}) ->
+ {ok, Writer} = SWF(),
+ State#state{writer = Writer}.
+
+resolve_consumer(_ConsumerTag, #state{consumers = []}) ->
+ exit(no_consumers_registered);
+resolve_consumer(ConsumerTag, #state{consumers = Consumers,
+ default_consumer = DefaultConsumer}) ->
+ case dict:find(ConsumerTag, Consumers) of
+ {ok, Value} ->
+ Value;
+ error ->
+ case is_pid(DefaultConsumer) of
+ true -> DefaultConsumer;
+ false -> exit(unexpected_delivery_and_no_default_consumer)
+ end
+ end.
+
+register_consumer(ConsumerTag, Consumer,
+ State = #state{consumers = Consumers0}) ->
+ Consumers1 = dict:store(ConsumerTag, Consumer, Consumers0),
+ State#state{consumers = Consumers1}.
+
+unregister_consumer(ConsumerTag,
+ State = #state{consumers = Consumers0}) ->
+ Consumers1 = dict:erase(ConsumerTag, Consumers0),
+ State#state{consumers = Consumers1}.
+
+amqp_msg(none) ->
+ none;
+amqp_msg(Content) ->
+ {Props, Payload} = rabbit_basic:from_content(Content),
+ #amqp_msg{props = Props, payload = Payload}.
+
+build_content(none) ->
+ none;
+build_content(#amqp_msg{props = Props, payload = Payload}) ->
+ rabbit_basic:build_content(Props, Payload).
+
+check_block(_Method, _AmqpMsg, #state{closing = just_channel}) ->
+ closing;
+check_block(_Method, _AmqpMsg, #state{closing = connection}) ->
+ closing;
+check_block(_Method, none, #state{}) ->
+ ok;
+check_block(_Method, #amqp_msg{}, #state{flow_active = false}) ->
+ blocked;
+check_block(_Method, _AmqpMsg, #state{}) ->
+ ok.
+
+check_invalid_method(#'channel.open'{}) ->
+ {use_amqp_connection_module,
+ "Use amqp_connection:open_channel/{1,2} instead"};
+check_invalid_method(#'channel.close'{}) ->
+ {use_close_function, "Use close/{1,3} instead"};
+check_invalid_method(#'basic.consume'{}) ->
+ {use_subscribe_function, "Use subscribe/3 instead"};
+check_invalid_method(Method) ->
+ case is_connection_method(Method) of
+ true -> {connection_methods_not_allowed,
+ "Sending connection methods is not allowed"};
+ false -> ok
+ end.
+
+is_connection_method(Method) ->
+ {ClassId, _} = ?PROTOCOL:method_id(element(1, Method)),
+ ?PROTOCOL:lookup_class_name(ClassId) == connection.
+
+server_misbehaved(#amqp_error{} = AmqpError, State = #state{number = Number}) ->
+ case rabbit_binary_generator:map_exception(Number, AmqpError, ?PROTOCOL) of
+ {true, _, _} ->
+ {stop, {server_misbehaved, AmqpError}, State};
+ {false, _, Close} ->
+ ?LOG_WARN("Channel (~p) flushing and closing due to soft "
+ "error caused by the server ~p~n", [self(), AmqpError]),
+ Self = self(),
+ spawn(fun () -> call(Self, Close) end),
+ {noreply, State}
+ end.
85 src/amqp_channel_sup.erl
@@ -0,0 +1,85 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+%% @private
+-module(amqp_channel_sup).
+
+-include("amqp_client.hrl").
+
+-behaviour(supervisor2).
+
+-export([start_link/3]).
+-export([init/1]).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+start_link(Type, InfraArgs, ChNumber) ->
+ {ok, Sup} = supervisor2:start_link(?MODULE, []),
+ {ok, ChPid} = supervisor2:start_child(
+ Sup, {channel, {amqp_channel, start_link,
+ [Type, ChNumber,
+ start_writer_fun(Sup, Type, InfraArgs,
+ ChNumber)]},
+ intrinsic, brutal_kill, worker, [amqp_channel]}),
+ {ok, AState} = init_command_assembler(Type),
+ {ok, Sup, {ChPid, AState}}.
+
+%%---------------------------------------------------------------------------
+%% Internal plumbing
+%%---------------------------------------------------------------------------
+
+start_writer_fun(Sup, direct, [User, VHost, Collector], ChNumber) ->
+ fun() ->
+ ChPid = self(),
+ {ok, _} = supervisor2:start_child(
+ Sup,
+ {rabbit_channel, {rabbit_channel, start_link,
+ [ChNumber, ChPid, ChPid, User, VHost,
+ Collector, start_limiter_fun(Sup)]},
+ transient, ?MAX_WAIT, worker, [rabbit_channel]})
+ end;
+start_writer_fun(Sup, network, [Sock], ChNumber) ->
+ fun() ->
+ ChPid = self(),
+ {ok, _} = supervisor2:start_child(
+ Sup,
+ {writer, {rabbit_writer, start_link,
+ [Sock, ChNumber, ?FRAME_MIN_SIZE, ?PROTOCOL,
+ ChPid]},
+ transient, ?MAX_WAIT, worker, [rabbit_writer]})
+ end.
+
+init_command_assembler(direct) -> {ok, none};
+init_command_assembler(network) -> rabbit_command_assembler:init(?PROTOCOL).
+
+start_limiter_fun(Sup) ->
+ fun (UnackedCount) ->
+ Parent = self(),
+ {ok, _} = supervisor2:start_child(
+ Sup,
+ {limiter, {rabbit_limiter, start_link,
+ [Parent, UnackedCount]},
+ transient, ?MAX_WAIT, worker, [rabbit_limiter]})
+ end.
+
+%%---------------------------------------------------------------------------
+%% supervisor2 callbacks
+%%---------------------------------------------------------------------------
+
+init([]) ->
+ {ok, {{one_for_all, 0, 1}, []}}.
44 src/amqp_channel_sup_sup.erl
@@ -0,0 +1,44 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+%% @private
+-module(amqp_channel_sup_sup).
+
+-include("amqp_client.hrl").
+
+-behaviour(supervisor2).
+
+-export([start_link/1, start_channel_sup/3]).
+-export([init/1]).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+start_link(Type) ->
+ supervisor2:start_link(?MODULE, [Type]).
+
+start_channel_sup(Sup, InfraArgs, ChannelNumber) ->
+ supervisor2:start_child(Sup, [InfraArgs, ChannelNumber]).
+
+%%---------------------------------------------------------------------------
+%% supervisor2 callbacks
+%%---------------------------------------------------------------------------
+
+init([Type]) ->
+ {ok, {{simple_one_for_one, 0, 1},
+ [{channel_sup, {amqp_channel_sup, start_link, [Type]},
+ temporary, brutal_kill, supervisor, [amqp_channel_sup]}]}}.
239 src/amqp_channels_manager.erl
@@ -0,0 +1,239 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+%% @private
+-module(amqp_channels_manager).
+
+-include("amqp_client.hrl").
+
+-behaviour(gen_server).
+
+-export([start_link/2, open_channel/3, set_channel_max/2, is_empty/1,
+ num_channels/1, pass_frame/3, signal_connection_closing/2]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2]).
+
+-record(state, {connection,
+ channel_sup_sup,
+ map_num_pa = gb_trees:empty(), %% Number -> {Pid, AState}
+ map_pid_num = dict:new(), %% Pid -> Number
+ channel_max = ?MAX_CHANNEL_NUMBER,
+ closing = false}).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+start_link(Connection, ChSupSup) ->
+ gen_server:start_link(?MODULE, [Connection, ChSupSup], []).
+
+open_channel(ChMgr, ProposedNumber, InfraArgs) ->
+ gen_server:call(ChMgr, {open_channel, ProposedNumber, InfraArgs}, infinity).
+
+set_channel_max(ChMgr, ChannelMax) ->
+ gen_server:cast(ChMgr, {set_channel_max, ChannelMax}).
+
+is_empty(ChMgr) ->
+ gen_server:call(ChMgr, is_empty, infinity).
+
+num_channels(ChMgr) ->
+ gen_server:call(ChMgr, num_channels, infinity).
+
+pass_frame(ChMgr, ChNumber, Frame) ->
+ gen_server:cast(ChMgr, {pass_frame, ChNumber, Frame}).
+
+signal_connection_closing(ChMgr, ChannelCloseType) ->
+ gen_server:cast(ChMgr, {connection_closing, ChannelCloseType}).
+
+%%---------------------------------------------------------------------------
+%% gen_server callbacks
+%%---------------------------------------------------------------------------
+
+init([Connection, ChSupSup]) ->
+ {ok, #state{connection = Connection, channel_sup_sup = ChSupSup}}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ State.
+
+handle_call({open_channel, ProposedNumber, InfraArgs}, _,
+ State = #state{closing = false}) ->
+ handle_open_channel(ProposedNumber, InfraArgs, State);
+handle_call(is_empty, _, State) ->
+ {reply, internal_is_empty(State), State};
+handle_call(num_channels, _, State) ->
+ {reply, internal_num_channels(State), State}.
+
+handle_cast({set_channel_max, ChannelMax}, State) ->
+ {noreply, State#state{channel_max = ChannelMax}};
+handle_cast({pass_frame, ChNumber, Frame}, State) ->
+ {noreply, internal_pass_frame(ChNumber, Frame, State)};
+handle_cast({connection_closing, ChannelCloseType}, State) ->
+ handle_connection_closing(ChannelCloseType, State).
+
+handle_info({'DOWN', _, process, Pid, Reason}, State) ->
+ handle_down(Pid, Reason, State).
+
+%%---------------------------------------------------------------------------
+%% Internal plumbing
+%%---------------------------------------------------------------------------
+
+handle_open_channel(ProposedNumber, InfraArgs,
+ State = #state{channel_sup_sup = ChSupSup}) ->
+ case new_number(ProposedNumber, State) of
+ {ok, Number} ->
+ {ok, _ChSup, {Ch, AState}} =
+ amqp_channel_sup_sup:start_channel_sup(ChSupSup, InfraArgs,
+ Number),
+ NewState = internal_register(Number, Ch, AState, State),
+ erlang:monitor(process, Ch),
+ {reply, {ok, Ch}, NewState};
+ {error, _} = Error ->
+ {reply, Error, State}
+ end.
+
+new_number(none, #state{channel_max = ChannelMax, map_num_pa = MapNPA}) ->
+ case gb_trees:is_empty(MapNPA) of
+ true -> {ok, 1};
+ false -> {Smallest, _} = gb_trees:smallest(MapNPA),
+ if Smallest > 1 ->
+ {ok, Smallest - 1};
+ true ->
+ {Largest, _} = gb_trees:largest(MapNPA),
+ if Largest < ChannelMax -> {ok, Largest + 1};
+ true -> find_free(MapNPA)
+ end
+ end
+ end;
+new_number(Proposed, State = #state{channel_max = ChannelMax,
+ map_num_pa = MapNPA}) ->
+ IsValid = Proposed > 0 andalso Proposed =< ChannelMax andalso
+ not gb_trees:is_defined(Proposed, MapNPA),
+ case IsValid of true -> {ok, Proposed};
+ false -> new_number(none, State)
+ end.
+
+find_free(MapNPA) ->
+ find_free(gb_trees:iterator(MapNPA), 1).
+
+find_free(It, Candidate) ->
+ case gb_trees:next(It) of
+ {Number, _, It1} -> if Number > Candidate ->
+ {ok, Number - 1};
+ Number =:= Candidate ->
+ find_free(It1, Candidate + 1)
+ end;
+ none -> {error, out_of_channel_numbers}
+ end.
+
+handle_down(Pid, Reason, State) ->
+ case internal_lookup_pn(Pid, State) of
+ undefined -> {stop, {error, unexpected_down}, State};
+ Number -> handle_channel_down(Pid, Number, Reason, State)
+ end.
+
+handle_channel_down(Pid, Number, Reason, State) ->
+ maybe_report_down(Pid, Reason, State),
+ NewState = internal_unregister(Number, Pid, State),
+ check_all_channels_terminated(NewState),
+ {noreply, NewState}.
+
+maybe_report_down(_Pid, normal, _State) ->
+ ok;
+maybe_report_down(_Pid, {app_initiated_close, _, _}, _State) ->
+ ok;
+maybe_report_down(_Pid, {server_initiated_close, _, _}, _State) ->
+ ok;
+maybe_report_down(_Pid, connection_closing, _State) ->
+ ok;
+maybe_report_down(Pid, {server_initiated_hard_close, _, _} = Reason,
+ #state{connection = Connection}) ->
+ amqp_gen_connection:hard_error_in_channel(Connection, Pid, Reason);
+maybe_report_down(_Pid, {server_misbehaved, AmqpError},
+ #state{connection = Connection}) ->
+ amqp_gen_connection:server_misbehaved(Connection, AmqpError);
+maybe_report_down(Pid, Other, #state{connection = Connection}) ->
+ amqp_gen_connection:channel_internal_error(Connection, Pid, Other).
+
+check_all_channels_terminated(#state{closing = false}) ->
+ ok;
+check_all_channels_terminated(State = #state{closing = true,
+ connection = Connection}) ->
+ case internal_is_empty(State) of
+ true -> amqp_gen_connection:channels_terminated(Connection);
+ false -> ok
+ end.
+
+handle_connection_closing(ChannelCloseType,
+ State = #state{connection = Connection}) ->
+ case internal_is_empty(State) of
+ true -> amqp_gen_connection:channels_terminated(Connection);
+ false -> signal_channels_connection_closing(ChannelCloseType, State)
+ end,
+ {noreply, State#state{closing = true}}.
+
+%%---------------------------------------------------------------------------
+
+internal_pass_frame(Number, Frame, State) ->
+ case internal_lookup_npa(Number, State) of
+ undefined ->
+ ?LOG_INFO("Dropping frame ~p for invalid or closed "
+ "channel number ~p~n", [Frame, Number]);
+ {ChPid, AState} ->
+ NewAState = rabbit_reader:process_channel_frame(
+ Frame, ChPid, Number, ChPid, AState),
+ internal_update_npa(Number, ChPid, NewAState, State)
+ end.
+
+internal_register(Number, Pid, AState,
+ State = #state{map_num_pa = MapNPA, map_pid_num = MapPN}) ->
+ MapNPA1 = gb_trees:enter(Number, {Pid, AState}, MapNPA),
+ MapPN1 = dict:store(Pid, Number, MapPN),
+ State#state{map_num_pa = MapNPA1,
+ map_pid_num = MapPN1}.
+
+internal_unregister(Number, Pid,
+ State = #state{map_num_pa = MapNPA, map_pid_num = MapPN}) ->
+ MapNPA1 = gb_trees:delete(Number, MapNPA),
+ MapPN1 = dict:erase(Pid, MapPN),
+ State#state{map_num_pa = MapNPA1,
+ map_pid_num = MapPN1}.
+
+internal_is_empty(#state{map_num_pa = MapNPA}) ->
+ gb_trees:is_empty(MapNPA).
+
+internal_num_channels(#state{map_num_pa = MapNPA}) ->
+ gb_trees:size(MapNPA).
+
+internal_lookup_npa(Number, #state{map_num_pa = MapNPA}) ->
+ case gb_trees:lookup(Number, MapNPA) of {value, PA} -> PA;
+ none -> undefined
+ end.
+
+internal_lookup_pn(Pid, #state{map_pid_num = MapPN}) ->
+ case dict:find(Pid, MapPN) of {ok, Number} -> Number;
+ error -> undefined
+ end.
+
+internal_update_npa(Number, Pid, AState, State = #state{map_num_pa = MapNPA}) ->
+ State#state{map_num_pa = gb_trees:update(Number, {Pid, AState}, MapNPA)}.
+
+signal_channels_connection_closing(ChannelCloseType,
+ #state{map_pid_num = MapPN}) ->
+ [amqp_channel:connection_closing(Pid, ChannelCloseType)
+ || Pid <- dict:fetch_keys(MapPN)].
8 src/amqp_client.app.src
@@ -0,0 +1,8 @@
+{application, amqp_client,
+ [{description, "RabbitMQ AMQP Client"},
+ {vsn, "2.2.0"},
+ {modules, []},
+ {registered, [amqp_sup]},
+ {env, []},
+ {mod, {amqp_client, []}},
+ {applications, [kernel, stdlib]}]}.
40 src/amqp_client.erl
@@ -0,0 +1,40 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+%% @private
+-module(amqp_client).
+
+-behaviour(application).
+
+-export([start/0]).
+-export([start/2, stop/1]).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+start() ->
+ application:start(amqp_client).
+
+%%---------------------------------------------------------------------------
+%% application callbacks
+%%---------------------------------------------------------------------------
+
+start(_StartType, _StartArgs) ->
+ amqp_sup:start_link().
+
+stop(_State) ->
+ ok.
206 src/amqp_connection.erl
@@ -0,0 +1,206 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+%% @doc This module is responsible for maintaining a connection to an AMQP
+%% broker and manages channels within the connection. This module is used to
+%% open and close connections to the broker as well as creating new channels
+%% within a connection.<br/>
+%% The connections and channels created by this module are supervised under
+%% amqp_client's supervision tree. Please note that connections and channels
+%% do not get restarted automatically by the supervision tree in the case of a
+%% failure. If you need robust connections and channels, we recommend you use
+%% Erlang monitors on the returned connection and channel PID.
+-module(amqp_connection).
+
+-include("amqp_client.hrl").
+
+-export([open_channel/1, open_channel/2]).
+-export([start/1, start/2]).
+-export([close/1, close/3]).
+-export([info/2, info_keys/1, info_keys/0]).
+
+%%---------------------------------------------------------------------------
+%% Type Definitions
+%%---------------------------------------------------------------------------
+
+%% @type amqp_params() = #amqp_params{}.
+%% As defined in amqp_client.hrl. It contains the following fields:
+%% <ul>
+%% <li>username :: binary() - The name of a user registered with the broker,
+%% defaults to &lt;&lt;guest"&gt;&gt;</li>
+%% <li>password :: binary() - The user's password, defaults to
+%% &lt;&lt;"guest"&gt;&gt;</li>
+%% <li>virtual_host :: binary() - The name of a virtual host in the broker,
+%% defaults to &lt;&lt;"/"&gt;&gt;</li>
+%% <li>host :: string() - The hostname of the broker,
+%% defaults to "localhost"</li>
+%% <li>port :: integer() - The port the broker is listening on,
+%% defaults to 5672</li>
+%% <li>channel_max :: non_neg_integer() - The channel_max handshake parameter,
+%% defaults to 0</li>
+%% <li>frame_max :: non_neg_integer() - The frame_max handshake parameter,
+%% defaults to 0</li>
+%% <li>heartbeat :: non_neg_integer() - The hearbeat interval in seconds,
+%% defaults to 0 (turned off)</li>
+%% <li>ssl_options :: term() - The second parameter to be used with the
+%% ssl:connect/2 function, defaults to 'none'</li>
+%% <li>client_properties :: [{binary(), atom(), binary()}] - A list of extra
+%% client properties to be sent to the server, defaults to []</li>
+%% </ul>
+
+%%---------------------------------------------------------------------------
+%% Starting a connection
+%%---------------------------------------------------------------------------
+
+%% @spec (Type) -> {ok, Connection} | {error, Error}
+%% where
+%% Type = network | direct
+%% Connection = pid()
+%% @doc Starts a connection to an AMQP server. Use network type to connect
+%% to a remote AMQP server - default connection settings are used, meaning that
+%% the server is expected to be at localhost:5672, with a vhost of "/"
+%% authorising a user guest/guest. Use direct type for a direct connection to
+%% a RabbitMQ server, assuming that the server is running in the same process
+%% space, and with a default set of amqp_params. If a different host, port,
+%% vhost or credential set is required, start/2 should be used.
+start(Type) ->
+ start(Type, #amqp_params{}).
+
+%% @spec (Type, amqp_params()) -> {ok, Connection} | {error, Error}
+%% where
+%% Type = network | direct
+%% Connection = pid()
+%% @doc Starts a connection to an AMQP server. Use network type to connect
+%% to a remote AMQP server or direct type for a direct connection to
+%% a RabbitMQ server, assuming that the server is running in the same process
+%% space.
+start(Type, AmqpParams) ->
+ case amqp_client:start() of
+ ok -> ok;
+ {error, {already_started, amqp_client}} -> ok;
+ {error, _} = E -> throw(E)
+ end,
+ {ok, _Sup, Connection} =
+ amqp_sup:start_connection_sup(
+ Type, case Type of direct -> amqp_direct_connection;
+ network -> amqp_network_connection
+ end, AmqpParams),
+ amqp_gen_connection:connect(Connection).
+
+%%---------------------------------------------------------------------------
+%% Commands
+%%---------------------------------------------------------------------------
+
+%% @doc Invokes open_channel(ConnectionPid, none).
+%% Opens a channel without having to specify a channel number.
+open_channel(ConnectionPid) ->
+ open_channel(ConnectionPid, none).
+
+%% @spec (ConnectionPid, ChannelNumber) -> {ok, ChannelPid} | {error, Error}
+%% where
+%% ChannelNumber = pos_integer() | 'none'
+%% ConnectionPid = pid()
+%% ChannelPid = pid()
+%% @doc Opens an AMQP channel.<br/>
+%% This function assumes that an AMQP connection (networked or direct)
+%% has already been successfully established.<br/>
+%% ChannelNumber must be less than or equal to the negotiated max_channel value,
+%% or less than or equal to ?MAX_CHANNEL_NUMBER if the negotiated max_channel
+%% value is 0.<br/>
+%% In the direct connection, max_channel is always 0.
+open_channel(ConnectionPid, ChannelNumber) ->
+ amqp_gen_connection:open_channel(ConnectionPid, ChannelNumber).
+
+%% @spec (ConnectionPid) -> ok | Error
+%% where
+%% ConnectionPid = pid()
+%% @doc Closes the channel, invokes
+%% close(Channel, 200, &lt;&lt;"Goodbye"&gt;&gt;).
+close(ConnectionPid) ->
+ close(ConnectionPid, 200, <<"Goodbye">>).
+
+%% @spec (ConnectionPid, Code, Text) -> ok | closing
+%% where
+%% ConnectionPid = pid()
+%% Code = integer()
+%% Text = binary()
+%% @doc Closes the AMQP connection, allowing the caller to set the reply
+%% code and text.
+close(ConnectionPid, Code, Text) ->
+ Close = #'connection.close'{reply_text = Text,
+ reply_code = Code,
+ class_id = 0,
+ method_id = 0},
+ amqp_gen_connection:close(ConnectionPid, Close).
+
+%%---------------------------------------------------------------------------
+%% Other functions
+%%---------------------------------------------------------------------------
+
+%% @spec (ConnectionPid, Items) -> ResultList
+%% where
+%% ConnectionPid = pid()
+%% Items = [Item]
+%% ResultList = [{Item, Result}]
+%% Item = atom()
+%% Result = term()
+%% @doc Returns information about the connection, as specified by the Items
+%% list. Item may be any atom returned by info_keys/1:
+%%<ul>
+%%<li>type - returns the type of the connection (network or direct)</li>
+%%<li>server_properties - returns the server_properties fields sent by the
+%% server while establishing the connection</li>
+%%<li>is_closing - returns true if the connection is in the process of closing
+%% and false otherwise</li>
+%%<li>amqp_params - returns the #amqp_params{} structure used to start the
+%% connection</li>
+%%<li>num_channels - returns the number of channels currently open under the
+%% connection (excluding channel 0)</li>
+%%<li>channel_max - returns the channel_max value negotiated with the
+%% server</li>
+%%<li>heartbeat - returns the heartbeat value negotiated with the server
+%% (only for the network connection)</li>
+%%<li>frame_max - returns the frame_max value negotiated with the
+%% server (only for the network connection)</li>
+%%<li>sock - returns the socket for the network connection (for use with
+%% e.g. inet:sockname/1) (only for the network connection)</li>
+%%<li>any other value - throws an exception</li>
+%%</ul>
+info(ConnectionPid, Items) ->
+ amqp_gen_connection:info(ConnectionPid, Items).
+
+%% @spec (ConnectionPid) -> Items
+%% where
+%% ConnectionPid = pid()
+%% Items = [Item]
+%% Item = atom()
+%% @doc Returns a list of atoms that can be used in conjunction with info/2.
+%% Note that the list differs from a type of connection to another (network vs.
+%% direct). Use info_keys/0 to get a list of info keys that can be used for
+%% any connection.
+info_keys(ConnectionPid) ->
+ amqp_gen_connection:info_keys(ConnectionPid).
+
+%% @spec () -> Items
+%% where
+%% Items = [Item]
+%% Item = atom()
+%% @doc Returns a list of atoms that can be used in conjunction with info/2.
+%% These are general info keys, which can be used in any type of connection.
+%% Other info keys may exist for a specific type. To get the full list of
+%% atoms that can be used for a certain connection, use info_keys/1.
+info_keys() ->
+ amqp_gen_connection:info_keys().
94 src/amqp_connection_sup.erl
@@ -0,0 +1,94 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+%% @private
+-module(amqp_connection_sup).
+
+-include("amqp_client.hrl").
+
+-behaviour(supervisor2).
+
+-export([start_link/3]).
+-export([init/1]).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+start_link(Type, Module, AmqpParams) ->
+ {ok, Sup} = supervisor2:start_link(?MODULE, []),
+ {ok, ChSupSup} = supervisor2:start_child(
+ Sup,
+ {channel_sup_sup, {amqp_channel_sup_sup, start_link,
+ [Type]},
+ intrinsic, infinity, supervisor,
+ [amqp_channel_sup_sup]}),
+ SChMF = start_channels_manager_fun(Sup, ChSupSup),
+ SIF = start_infrastructure_fun(Sup, Type),
+ {ok, Connection} = supervisor2:start_child(
+ Sup,
+ {connection, {amqp_gen_connection, start_link,
+ [Module, AmqpParams, SIF, SChMF, []]},
+ intrinsic, brutal_kill, worker,
+ [amqp_gen_connection]}),
+ {ok, Sup, Connection}.
+
+%%---------------------------------------------------------------------------
+%% Internal plumbing
+%%---------------------------------------------------------------------------
+
+start_infrastructure_fun(Sup, network) ->
+ fun (Sock, ChMgr) ->
+ Connection = self(),
+ {ok, CTSup, {MainReader, AState, Writer}} =
+ supervisor2:start_child(
+ Sup,
+ {connection_type_sup, {amqp_connection_type_sup,
+ start_link_network,
+ [Sock, Connection, ChMgr]},
+ transient, infinity, supervisor,
+ [amqp_connection_type_sup]}),
+ {ok, {MainReader, AState, Writer,
+ amqp_connection_type_sup:start_heartbeat_fun(CTSup)}}
+ end;
+start_infrastructure_fun(Sup, direct) ->
+ fun () ->
+ {ok, _CTSup, Collector} =
+ supervisor2:start_child(
+ Sup,
+ {connection_type_sup, {amqp_connection_type_sup,
+ start_link_direct, []},
+ transient, infinity, supervisor,
+ [amqp_connection_type_sup]}),
+ {ok, Collector}
+ end.
+
+start_channels_manager_fun(Sup, ChSupSup) ->
+ fun () ->
+ Connection = self(),
+ {ok, _} = supervisor2:start_child(
+ Sup,
+ {channels_manager, {amqp_channels_manager, start_link,
+ [Connection, ChSupSup]},
+ transient, ?MAX_WAIT, worker, [amqp_channels_manager]})
+ end.
+
+%%---------------------------------------------------------------------------
+%% supervisor2 callbacks
+%%---------------------------------------------------------------------------
+
+init([]) ->
+ {ok, {{one_for_all, 0, 1}, []}}.
65 src/amqp_connection_type_sup.erl
@@ -0,0 +1,65 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+%% @private
+-module(amqp_connection_type_sup).
+
+-include("amqp_client.hrl").
+
+-behaviour(supervisor2).
+
+-export([start_link_direct/0, start_link_network/3, start_heartbeat_fun/1]).
+-export([init/1]).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+start_link_direct() ->
+ {ok, Sup} = supervisor2:start_link(?MODULE, []),
+ {ok, Collector} =
+ supervisor2:start_child(
+ Sup,
+ {collector, {rabbit_queue_collector, start_link, []},
+ transient, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
+ {ok, Sup, Collector}.
+
+start_link_network(Sock, Connection, ChMgr) ->
+ {ok, Sup} = supervisor2:start_link(?MODULE, []),
+ {ok, AState} = rabbit_command_assembler:init(?PROTOCOL),
+ {ok, Writer} =
+ supervisor2:start_child(
+ Sup,
+ {writer, {rabbit_writer, start_link,
+ [Sock, 0, ?FRAME_MIN_SIZE, ?PROTOCOL, Connection]},
+ transient, ?MAX_WAIT, worker, [rabbit_writer]}),
+ {ok, MainReader} =
+ supervisor2:start_child(
+ Sup,
+ {main_reader, {amqp_main_reader, start_link,
+ [Sock, Connection, ChMgr, AState]},
+ transient, ?MAX_WAIT, worker, [amqp_main_reader]}),
+ {ok, Sup, {MainReader, AState, Writer}}.
+
+start_heartbeat_fun(SupPid) ->
+ rabbit_heartbeat:start_heartbeat_fun(SupPid).
+
+%%---------------------------------------------------------------------------
+%% supervisor2 callbacks
+%%---------------------------------------------------------------------------
+
+init([]) ->
+ {ok, {{one_for_all, 0, 1}, []}}.
91 src/amqp_direct_connection.erl
@@ -0,0 +1,91 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+%% @private
+-module(amqp_direct_connection).
+
+-include("amqp_client.hrl").
+
+-behaviour(amqp_gen_connection).
+
+-export([init/1, terminate/2, connect/4, do/2, open_channel_args/1, i/2,
+ info_keys/0, handle_message/2, closing/3, channels_terminated/1]).
+
+-record(state, {user,
+ vhost,
+ collector,
+ closing_reason %% undefined | Reason
+ }).
+
+-define(INFO_KEYS, [type]).
+
+%%---------------------------------------------------------------------------
+
+init([]) ->
+ {ok, #state{}}.
+
+open_channel_args(#state{user = User, vhost = VHost, collector = Collector}) ->
+ [User, VHost, Collector].
+
+do(_Method, _State) ->
+ ok.
+
+handle_message(Msg, State) ->
+ {stop, {unexpected_msg, Msg}, State}.
+
+closing(_ChannelCloseType, Reason, State) ->
+ {ok, State#state{closing_reason = Reason}}.
+
+channels_terminated(State = #state{closing_reason = Reason,
+ collector = Collector}) ->
+ rabbit_queue_collector:delete_all(Collector),
+ {stop, Reason, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+i(type, _State) -> direct;
+i(Item, _State) -> throw({bad_argument, Item}).
+
+info_keys() ->
+ ?INFO_KEYS.
+
+connect(AmqpParams, SIF, _ChMgr, State) ->
+ try do_connect(AmqpParams, SIF, State) of
+ Return -> Return
+ catch _:Reason -> {error, Reason}
+ end.
+
+do_connect(#amqp_params{username = Username, password = Pass,
+ virtual_host = VHost},
+ SIF, State) ->
+ case lists:keymember(rabbit, 1, application:which_applications()) of
+ true -> ok;
+ false -> exit(broker_not_found_in_vm)
+ end,
+ User = try rabbit_access_control:user_pass_login(Username, Pass) of
+ User1 -> User1
+ catch exit:#amqp_error{name = access_refused} -> exit(auth_failure)
+ end,
+ try rabbit_access_control:check_vhost_access(User, VHost) of
+ _ -> ok
+ catch exit:#amqp_error{name = access_refused} -> exit(access_refused)
+ end,
+ {ok, Collector} = SIF(),
+ {ok, {rabbit_reader:server_properties(), 0,
+ State#state{user = User,
+ vhost = VHost,
+ collector = Collector}}}.
339 src/amqp_gen_connection.erl
@@ -0,0 +1,339 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+%% @private
+-module(amqp_gen_connection).
+
+-include("amqp_client.hrl").
+
+-behaviour(gen_server).
+
+-export([start_link/5, connect/1, open_channel/2, hard_error_in_channel/3,
+ channel_internal_error/3, server_misbehaved/2, channels_terminated/1,
+ close/2, info/2, info_keys/0, info_keys/1]).
+-export([behaviour_info/1]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2]).
+
+-define(CLIENT_CLOSE_TIMEOUT, 60000).
+
+-define(INFO_KEYS, [server_properties, is_closing, amqp_params, num_channels,
+ channel_max]).
+
+-record(state, {module,
+ module_state,
+ sup,
+ channels_manager,
+ amqp_params,
+ channel_max,
+ server_properties,
+ start_infrastructure_fun,
+ start_channels_manager_fun,
+ closing = false %% #closing{} | false
+ }).
+
+-record(closing, {reason,
+ close,
+ from = none}).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+start_link(Mod, AmqpParams, SIF, SChMF, ExtraParams) ->
+ gen_server:start_link(?MODULE,
+ [Mod, self(), AmqpParams, SIF, SChMF, ExtraParams],
+ []).
+
+connect(Pid) ->
+ gen_server:call(Pid, connect, infinity).
+
+open_channel(Pid, ProposedNumber) ->
+ case gen_server:call(Pid, {command, {open_channel, ProposedNumber}},
+ infinity) of
+ {ok, ChannelPid} -> ok = amqp_channel:open(ChannelPid),
+ {ok, ChannelPid};
+ Error -> Error
+ end.
+
+hard_error_in_channel(Pid, ChannelPid, Reason) ->
+ gen_server:cast(Pid, {hard_error_in_channel, ChannelPid, Reason}).
+
+channel_internal_error(Pid, ChannelPid, Reason) ->
+ gen_server:cast(Pid, {channel_internal_error, ChannelPid, Reason}).
+
+server_misbehaved(Pid, AmqpError) ->
+ gen_server:cast(Pid, {server_misbehaved, AmqpError}).
+
+channels_terminated(Pid) ->
+ gen_server:cast(Pid, channels_terminated).
+
+close(Pid, Close) ->
+ gen_server:call(Pid, {command, {close, Close}}, infinity).
+
+info(Pid, Items) ->
+ gen_server:call(Pid, {info, Items}, infinity).
+
+info_keys() ->
+ ?INFO_KEYS.
+
+info_keys(Pid) ->
+ gen_server:call(Pid, info_keys, infinity).
+
+%%---------------------------------------------------------------------------
+%% Behaviour
+%%---------------------------------------------------------------------------
+
+behaviour_info(callbacks) ->
+ [
+ %% init(Params) -> {ok, InitialState}
+ {init, 1},
+
+ %% terminate(Reason, FinalState) -> Ignored
+ {terminate, 2},
+
+ %% connect(AmqpParams, SIF, ChMgr, State) ->
+ %% {ok, ConnectParams} | {closing, ConnectParams, AmqpError, Reply} |
+ %% {error, Error}
+ %% where
+ %% ConnectParams = {ServerProperties, ChannelMax, NewState}
+ {connect, 4},
+
+ %% do(Method, State) -> Ignored
+ {do, 2},
+
+ %% open_channel_args(State) -> OpenChannelArgs
+ {open_channel_args, 1},
+
+ %% i(InfoItem, State) -> Info
+ {i, 2},
+
+ %% info_keys() -> [InfoItem]
+ {info_keys, 0},
+
+ %% CallbackReply = {ok, NewState} | {stop, Reason, FinalState}
+
+ %% handle_message(Message, State) -> CallbackReply
+ {handle_message, 2},
+
+ %% closing(flush|abrupt, Reason, State) -> CallbackReply
+ {closing, 3},
+
+ %% channels_terminated(State) -> CallbackReply
+ {channels_terminated, 1}
+ ];
+behaviour_info(_Other) ->
+ undefined.
+
+callback(Function, Params, State = #state{module = Mod,
+ module_state = MState}) ->
+ case erlang:apply(Mod, Function, Params ++ [MState]) of
+ {ok, NewMState} -> {noreply,
+ State#state{module_state = NewMState}};
+ {stop, Reason, NewMState} -> {stop, Reason,
+ State#state{module_state = NewMState}}
+ end.
+
+%%---------------------------------------------------------------------------
+%% gen_server callbacks
+%%---------------------------------------------------------------------------
+
+init([Mod, Sup, AmqpParams, SIF, SChMF, ExtraParams]) ->
+ {ok, MState} = Mod:init(ExtraParams),
+ {ok, #state{module = Mod,
+ module_state = MState,
+ sup = Sup,
+ amqp_params = AmqpParams,
+ start_infrastructure_fun = SIF,
+ start_channels_manager_fun = SChMF}}.
+
+handle_call(connect, _From,
+ State0 = #state{module = Mod,
+ module_state = MState,
+ amqp_params = AmqpParams,
+ start_infrastructure_fun = SIF,
+ start_channels_manager_fun = SChMF}) ->
+ {ok, ChMgr} = SChMF(),
+ State1 = State0#state{channels_manager = ChMgr},
+ case Mod:connect(AmqpParams, SIF, ChMgr, MState) of
+ {ok, Params} ->
+ {reply, {ok, self()}, after_connect(Params, State1)};
+ {closing, Params, #amqp_error{} = AmqpError, Error} ->
+ server_misbehaved(self(), AmqpError),
+ {reply, Error, after_connect(Params, State1)};
+ {error, _} = Error ->
+ {stop, Error, Error, State0}
+ end;
+handle_call({command, Command}, From, State = #state{closing = Closing}) ->
+ case Closing of false -> handle_command(Command, From, State);
+ _ -> {reply, closing, State}
+ end;
+handle_call({info, Items}, _From, State) ->
+ {reply, [{Item, i(Item, State)} || Item <- Items], State};
+handle_call(info_keys, _From, State = #state{module = Mod}) ->
+ {reply, ?INFO_KEYS ++ Mod:info_keys(), State}.
+
+after_connect({ServerProperties, ChannelMax, NewMState},
+ State = #state{channels_manager = ChMgr}) ->
+ case ChannelMax of
+ 0 -> ok;
+ _ -> amqp_channels_manager:set_channel_max(ChMgr, ChannelMax)
+ end,
+ State#state{server_properties = ServerProperties,
+ channel_max = ChannelMax,
+ module_state = NewMState}.
+
+handle_cast({method, Method, none}, State) ->
+ handle_method(Method, State);
+handle_cast(channels_terminated, State) ->
+ handle_channels_terminated(State);
+handle_cast({hard_error_in_channel, Pid, Reason}, State) ->
+ ?LOG_WARN("Connection (~p) closing: channel (~p) received hard error ~p "
+ "from server~n", [self(), Pid, Reason]),
+ {stop, Reason, State};
+handle_cast({channel_internal_error, Pid, Reason}, State) ->
+ ?LOG_WARN("Connection (~p) closing: internal error in channel (~p): ~p~n",
+ [self(), Pid, Reason]),
+ internal_error(State);
+handle_cast({server_misbehaved, AmqpError}, State) ->
+ ?LOG_WARN("Connection (~p) closing: server misbehaved: ~p~n",
+ [self(), AmqpError]),
+ server_misbehaved_close(AmqpError, State).
+
+handle_info(Info, State) ->
+ callback(handle_message, [Info], State).
+
+terminate(Reason, #state{module = Mod, module_state = MState}) ->
+ Mod:terminate(Reason, MState).
+
+code_change(_OldVsn, State, _Extra) ->
+ State.
+
+%%---------------------------------------------------------------------------
+%% Infos
+%%---------------------------------------------------------------------------
+
+i(server_properties, State) -> State#state.server_properties;
+i(is_closing, State) -> State#state.closing =/= false;
+i(amqp_params, State) -> State#state.amqp_params;
+i(channel_max, State) -> State#state.channel_max;
+i(num_channels, State) -> amqp_channels_manager:num_channels(
+ State#state.channels_manager);
+i(Item, #state{module = Mod, module_state = MState}) -> Mod:i(Item, MState).
+
+%%---------------------------------------------------------------------------
+%% Command handling
+%%---------------------------------------------------------------------------
+
+handle_command({open_channel, ProposedNumber}, _From,
+ State = #state{channels_manager = ChMgr,
+ module = Mod,
+ module_state = MState}) ->
+ {reply, amqp_channels_manager:open_channel(ChMgr, ProposedNumber,
+ Mod:open_channel_args(MState)),
+ State};
+ handle_command({close, #'connection.close'{} = Close}, From, State) ->
+ app_initiated_close(Close, From, State).
+
+%%---------------------------------------------------------------------------
+%% Handling methods from broker
+%%---------------------------------------------------------------------------
+
+handle_method(#'connection.close'{} = Close, State) ->
+ server_initiated_close(Close, State);
+handle_method(#'connection.close_ok'{}, State = #state{closing = Closing}) ->
+ case Closing of #closing{from = none} -> ok;
+ #closing{from = From} -> gen_server:reply(From, ok)
+ end,
+ {stop, closing_to_reason(Closing), State};
+handle_method(Other, State) ->
+ server_misbehaved_close(#amqp_error{name = command_invalid,
+ explanation = "unexpected method on "
+ "channel 0",
+ method = element(1, Other)},
+ State).
+
+%%---------------------------------------------------------------------------
+%% Closing
+%%---------------------------------------------------------------------------
+
+app_initiated_close(Close, From, State) ->
+ set_closing_state(flush, #closing{reason = app_initiated_close,
+ close = Close,
+ from = From}, State).
+
+internal_error(State) ->
+ Close = #'connection.close'{reply_text = <<>>,
+ reply_code = ?INTERNAL_ERROR,
+ class_id = 0,
+ method_id = 0},
+ set_closing_state(abrupt, #closing{reason = internal_error, close = Close},
+ State).
+
+server_initiated_close(Close, State) ->
+ set_closing_state(abrupt, #closing{reason = server_initiated_close,
+ close = Close}, State).
+
+server_misbehaved_close(AmqpError, State) ->
+ {true, 0, Close} =
+ rabbit_binary_generator:map_exception(0, AmqpError, ?PROTOCOL),
+ set_closing_state(abrupt, #closing{reason = server_misbehaved,
+ close = Close}, State).
+
+set_closing_state(ChannelCloseType, NewClosing,
+ State = #state{channels_manager = ChMgr,
+ closing = CurClosing}) ->
+ amqp_channels_manager:signal_connection_closing(ChMgr, ChannelCloseType),
+ ResClosing =
+ case closing_priority(NewClosing) =< closing_priority(CurClosing) of
+ true -> NewClosing;
+ false -> CurClosing
+ end,
+ callback(closing, [ChannelCloseType, closing_to_reason(ResClosing)],
+ State#state{closing = ResClosing}).
+
+closing_priority(false) -> 99;
+closing_priority(#closing{reason = app_initiated_close}) -> 4;
+closing_priority(#closing{reason = internal_error}) -> 3;
+closing_priority(#closing{reason = server_misbehaved}) -> 2;
+closing_priority(#closing{reason = server_initiated_close}) -> 1.
+
+closing_to_reason(#closing{close = #'connection.close'{reply_code = 200}}) ->
+ normal;
+closing_to_reason(#closing{reason = Reason,
+ close = #'connection.close'{reply_code = Code,
+ reply_text = Text}}) ->
+ {Reason, Code, Text}.
+
+handle_channels_terminated(State = #state{closing = Closing,
+ module = Mod,
+ module_state = MState}) ->
+ #closing{reason = Reason, close = Close, from = From} = Closing,
+ case Reason of
+ server_initiated_close ->
+ Mod:do(#'connection.close_ok'{}, MState);
+ _ ->
+ Mod:do(Close, MState),
+ erlang:send_after(?CLIENT_CLOSE_TIMEOUT, self(),
+ {'$gen_cast', timeout_waiting_for_close_ok})
+ end,
+ case callback(channels_terminated, [], State) of
+ {stop, _, _} = Stop -> case From of none -> ok;
+ _ -> gen_server:reply(From, ok)
+ end,
+ Stop;
+ Other -> Other
+ end.
111 src/amqp_main_reader.erl
@@ -0,0 +1,111 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+%% @private
+-module(amqp_main_reader).
+
+-include("amqp_client.hrl").
+
+-behaviour(gen_server).
+
+-export([start_link/4]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2]).
+
+-record(state, {sock,
+ connection,
+ channels_manager,
+ astate,
+ message = none %% none | {Type, Channel, Length}
+ }).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+start_link(Sock, Connection, ChMgr, AState) ->
+ gen_server:start_link(?MODULE, [Sock, Connection, ChMgr, AState], []).
+
+%%---------------------------------------------------------------------------
+%% gen_server callbacks
+%%---------------------------------------------------------------------------
+
+init([Sock, Connection, ChMgr, AState]) ->
+ {ok, _Ref} = rabbit_net:async_recv(Sock, 7, infinity),
+ {ok, #state{sock = Sock, connection = Connection,
+ channels_manager = ChMgr, astate = AState}}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ State.
+
+handle_call(Call, From, State) ->
+ {stop, {unexpected_call, Call, From}, State}.
+
+handle_cast(Cast, State) ->
+ {stop, {unexpected_cast, Cast}, State}.
+
+handle_info({inet_async, _, _, _} = InetAsync, State) ->
+ handle_inet_async(InetAsync, State).
+
+%%---------------------------------------------------------------------------
+%% Internal plumbing
+%%---------------------------------------------------------------------------
+
+handle_inet_async({inet_async, Sock, _, Msg},
+ State = #state{sock = Sock, message = CurMessage}) ->
+ {Type, Number, Length} = case CurMessage of {T, N, L} -> {T, N, L};
+ none -> {none, none, none}
+ end,
+ case Msg of
+ {ok, <<Payload:Length/binary, ?FRAME_END>>} ->
+ State1 = process_frame(Type, Number, Payload, State),
+ {ok, _Ref} = rabbit_net:async_recv(Sock, 7, infinity),
+ {noreply, State1#state{message = none}};
+ {ok, <<NewType:8, NewChannel:16, NewLength:32>>} ->
+ {ok, _Ref} = rabbit_net:async_recv(Sock, NewLength + 1, infinity),
+ {noreply, State#state{message={NewType, NewChannel, NewLength}}};
+ {error, closed} ->
+ State#state.connection ! socket_closed,
+ {noreply, State};
+ {error, Reason} ->
+ State#state.connection ! {socket_error, Reason},
+ {stop, {socket_error, Reason}, State}
+ end.
+
+process_frame(Type, ChNumber, Payload, State = #state{connection = Connection}) ->
+ case rabbit_command_assembler:analyze_frame(Type, Payload, ?PROTOCOL) of
+ heartbeat when ChNumber /= 0 ->
+ amqp_gen_connection:server_misbehaved(
+ Connection,
+ #amqp_error{name = command_invalid,
+ explanation = "heartbeat on non-zero channel"}),
+ State;
+ %% Match heartbeats but don't do anything with them
+ heartbeat ->
+ State;
+ AnalyzedFrame ->
+ pass_frame(ChNumber, AnalyzedFrame, State)
+ end.
+
+pass_frame(0, Frame, State = #state{connection = Conn, astate = AState}) ->
+ State#state{astate = rabbit_reader:process_channel_frame(Frame, Conn,
+ 0, Conn, AState)};
+pass_frame(Number, Frame, State = #state{channels_manager = ChMgr}) ->
+ amqp_channels_manager:pass_frame(ChMgr, Number, Frame),
+ State.
279 src/amqp_network_connection.erl
@@ -0,0 +1,279 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%%
+
+%% @private
+-module(amqp_network_connection).
+
+-include("amqp_client.hrl").
+
+-behaviour(amqp_gen_connection).
+
+-export([init/1, terminate/2, connect/4, do/2, open_channel_args/1, i/2,
+ info_keys/0, handle_message/2, closing/3, channels_terminated/1]).
+
+-define(RABBIT_TCP_OPTS, [binary, {packet, 0}, {active,false}, {nodelay, true}]).
+-define(SOCKET_CLOSING_TIMEOUT, 1000).
+-define(HANDSHAKE_RECEIVE_TIMEOUT, 60000).
+
+-record(state, {sock,
+ heartbeat,
+ writer0,
+ frame_max,
+ closing_reason, %% undefined | Reason
+ waiting_socket_close = false}).
+
+-define(INFO_KEYS, [type, heartbeat, frame_max, sock]).
+
+%%---------------------------------------------------------------------------
+
+init([]) ->
+ {ok, #state{}}.
+
+open_channel_args(#state{sock = Sock}) ->
+ [Sock].
+
+do(#'connection.close_ok'{} = CloseOk, State) ->
+ erlang:send_after(?SOCKET_CLOSING_TIMEOUT, self(), socket_closing_timeout),
+