Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

made from rabbitmq-erlang-client rabbitmq_v2_1_0

  • Loading branch information...
commit 16fb9982b9c85bb6c8fdd6f01d5c85bd60508735 0 parents
David Reid authored
2  .gitignore
@@ -0,0 +1,2 @@
+deps
+ebin
51 include/amqp_client.hrl
@@ -0,0 +1,51 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is the RabbitMQ Erlang Client.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd.,
+%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd., Cohesive Financial
+%% Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
+%% 2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
+%% Technologies Ltd.;
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): Ben Hood <0x6e6562@gmail.com>.
+%%
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit_framing.hrl").
+
+-define(PROTOCOL_VERSION_MAJOR, 0).
+-define(PROTOCOL_VERSION_MINOR, 9).
+-define(PROTOCOL_HEADER, <<"AMQP", 0, 0, 9, 1>>).
+-define(PROTOCOL, rabbit_framing_amqp_0_9_1).
+
+-define(MAX_CHANNEL_NUMBER, 65535).
+
+-record(amqp_msg, {props = #'P_basic'{}, payload = <<>>}).
+
+-record(amqp_params, {username = <<"guest">>,
+ password = <<"guest">>,
+ virtual_host = <<"/">>,
+ host = "localhost",
+ port = ?PROTOCOL_PORT,
+ channel_max = 0,
+ frame_max = 0,
+ heartbeat = 0,
+ ssl_options = none,
+ client_properties = []}).
+
+-define(LOG_DEBUG(Format), error_logger:info_msg(Format)).
+-define(LOG_INFO(Format, Args), error_logger:info_msg(Format, Args)).
+-define(LOG_WARN(Format, Args), error_logger:warning_msg(Format, Args)).
3  rebar.config
@@ -0,0 +1,3 @@
+{deps, [{rabbit_common, ".*",
+ {git, "https://github.com/dreid/rabbit_common.git", ""}}
+ ]}.
643 src/amqp_channel.erl
@@ -0,0 +1,643 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is the RabbitMQ Erlang Client.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd.,
+%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd., Cohesive Financial
+%% Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
+%% 2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
+%% Technologies Ltd.;
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): Ben Hood <0x6e6562@gmail.com>.
+%%
+
+%% @doc This module encapsulates the client's view of an AMQP
+%% channel. Each server side channel is represented by an amqp_channel
+%% process on the client side. Channel processes are created using the
+%% {@link amqp_connection} module. Channel processes are supervised
+%% under amqp_client's supervision tree.
+-module(amqp_channel).
+
+-include("amqp_client.hrl").
+
+-behaviour(gen_server).
+
+-export([start_link/3]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2]).
+-export([call/2, call/3, cast/2, cast/3]).
+-export([subscribe/3]).
+-export([close/1, close/3]).
+-export([register_return_handler/2]).
+-export([register_flow_handler/2]).
+-export([register_default_consumer/2]).
+
+-define(TIMEOUT_FLUSH, 60000).
+-define(TIMEOUT_CLOSE_OK, 3000).
+
+-record(state, {number,
+ sup,
+ driver,
+ rpc_requests = queue:new(),
+ anon_sub_requests = queue:new(),
+ tagged_sub_requests = dict:new(),
+ closing = false,
+ writer,
+ return_handler_pid = none,
+ flow_control = false,
+ flow_handler_pid = none,
+ consumers = dict:new(),
+ default_consumer = none,
+ start_infrastructure_fun
+ }).
+
+%%---------------------------------------------------------------------------
+%% Type Definitions
+%%---------------------------------------------------------------------------
+
+%% @type amqp_method().
+%% This abstract datatype represents the set of methods that comprise
+%% the AMQP execution model. As indicated in the overview, the
+%% attributes of each method in the execution model are described in
+%% the protocol documentation. The Erlang record definitions are
+%% autogenerated from a parseable version of the specification. Most
+%% fields in the generated records have sensible default values that
+%% you need not worry in the case of a simple usage of the client
+%% library.
+
+%% @type amqp_msg() = #amqp_msg{}.
+%% This is the content encapsulated in content-bearing AMQP methods. It
+%% contains the following fields:
+%% <ul>
+%% <li>props :: class_property() - A class property record, defaults to
+%% #'P_basic'{}</li>
+%% <li>payload :: binary() - The arbitrary data payload</li>
+%% </ul>
+
+%%---------------------------------------------------------------------------
+%% AMQP Channel API methods
+%%---------------------------------------------------------------------------
+
+%% @spec (Channel, Method) -> Result
+%% @doc This is equivalent to amqp_channel:call(Channel, Method, none).
+call(Channel, Method) ->
+ gen_server:call(Channel, {call, Method, none}, infinity).
+
+%% @spec (Channel, Method, Content) -> Result
+%% where
+%% Channel = pid()
+%% Method = amqp_method()
+%% Content = amqp_msg() | none
+%% Result = amqp_method() | ok | blocked | closing
+%% @doc This sends an AMQP method on the channel.
+%% For content bearing methods, Content has to be an amqp_msg(), whereas
+%% for non-content bearing methods, it needs to be the atom 'none'.<br/>
+%% In the case of synchronous methods, this function blocks until the
+%% corresponding reply comes back from the server and returns it.
+%% In the case of asynchronous methods, the function blocks until the method
+%% gets sent on the wire and returns the atom 'ok' on success.<br/>
+%% This will return the atom 'blocked' if the server has
+%% throttled the client for flow control reasons. This will return the
+%% atom 'closing' if the channel is in the process of shutting down.<br/>
+%% Note that for asynchronous methods, the synchronicity implied by
+%% 'call' only means that the client has transmitted the method to
+%% the broker. It does not necessarily imply that the broker has
+%% accepted responsibility for the message.
+call(Channel, Method, Content) ->
+ gen_server:call(Channel, {call, Method, Content}, infinity).
+
+%% @spec (Channel, Method) -> ok
+%% @doc This is equivalent to amqp_channel:cast(Channel, Method, none).
+cast(Channel, Method) ->
+ gen_server:cast(Channel, {cast, Method, none}).
+
+%% @spec (Channel, Method, Content) -> ok
+%% where
+%% Channel = pid()
+%% Method = amqp_method()
+%% Content = amqp_msg() | none
+%% @doc This function is the same as {@link call/3}, except that it returns
+%% immediately with the atom 'ok', without blocking the caller process.
+%% This function is not recommended with synchronous methods, since there is no
+%% way to verify that the server has received the method.
+cast(Channel, Method, Content) ->
+ gen_server:cast(Channel, {cast, Method, Content}).
+
+%% @spec (Channel) -> ok
+%% where
+%% Channel = pid()
+%% @doc Closes the channel, invokes
+%% close(Channel, 200, &lt;&lt;"Goodbye"&gt;&gt;).
+close(Channel) ->
+ close(Channel, 200, <<"Goodbye">>).
+
+%% @spec (Channel, Code, Text) -> ok
+%% where
+%% Channel = pid()
+%% Code = integer()
+%% Text = binary()
+%% @doc Closes the channel, allowing the caller to supply a reply code and
+%% text.
+close(Channel, Code, Text) ->
+ Close = #'channel.close'{reply_text = Text,
+ reply_code = Code,
+ class_id = 0,
+ method_id = 0},
+ #'channel.close_ok'{} = call(Channel, Close),
+ ok.
+
+%%---------------------------------------------------------------------------
+%% Consumer registration (API)
+%%---------------------------------------------------------------------------
+
+%% @type consume() = #'basic.consume'{}.
+%% The AMQP method that is used to subscribe a consumer to a queue.
+%% @spec (Channel, consume(), Consumer) -> amqp_method()
+%% where
+%% Channel = pid()
+%% Consumer = pid()
+%% @doc Creates a subscription to a queue. This subscribes a consumer pid to
+%% the queue defined in the #'basic.consume'{} method record. Note that
+%% both the process invoking this method and the supplied consumer process
+%% receive an acknowledgement of the subscription. The calling process will
+%% receive the acknowledgement as the return value of this function, whereas
+%% the consumer process will receive the notification asynchronously.
+subscribe(Channel, BasicConsume = #'basic.consume'{}, Consumer) ->
+ gen_server:call(Channel, {subscribe, BasicConsume, Consumer}, infinity).
+
+%% @spec (Channel, ReturnHandler) -> ok
+%% where
+%% Channel = pid()
+%% ReturnHandler = pid()
+%% @doc This registers a handler to deal with returned messages. The
+%% registered process will receive #basic.return{} records.
+register_return_handler(Channel, ReturnHandler) ->
+ gen_server:cast(Channel, {register_return_handler, ReturnHandler} ).
+
+%% @spec (Channel, FlowHandler) -> ok
+%% where
+%% Channel = pid()
+%% FlowHandler = pid()
+%% @doc This registers a handler to deal with channel flow notifications.
+%% The registered process will receive #channel.flow{} records.
+register_flow_handler(Channel, FlowHandler) ->
+ gen_server:cast(Channel, {register_flow_handler, FlowHandler} ).
+
+%% @spec (Channel, Consumer) -> ok
+%% where
+%% Channel = pid()
+%% Consumer = pid()
+%% @doc Set the current default consumer.
+%% Under certain circumstances it is possible for a channel to receive a
+%% message delivery which does not match any consumer which is currently
+%% set up via basic.consume. This will occur after the following sequence
+%% of events:<br/>
+%% <br/>
+%% basic.consume with explicit acks<br/>
+%% %% some deliveries take place but are not acked<br/>
+%% basic.cancel<br/>
+%% basic.recover{requeue = false}<br/>
+%% <br/>
+%% Since requeue is specified to be false in the basic.recover, the spec
+%% states that the message must be redelivered to "the original recipient"
+%% - i.e. the same channel / consumer-tag. But the consumer is no longer
+%% active.<br/>
+%% In these circumstances, you can register a default consumer to handle
+%% such deliveries. If no default consumer is registered then the channel
+%% will exit on receiving such a delivery.<br/>
+%% Most people will not need to use this.
+register_default_consumer(Channel, Consumer) ->
+ gen_server:cast(Channel, {register_default_consumer, Consumer}).
+
+%%---------------------------------------------------------------------------
+%% RPC mechanism
+%%---------------------------------------------------------------------------
+
+rpc_top_half(Method, Content, From,
+ State0 = #state{rpc_requests = RequestQueue}) ->
+ State1 = State0#state{
+ rpc_requests = queue:in({From, Method, Content}, RequestQueue)},
+ IsFirstElement = queue:is_empty(RequestQueue),
+ if IsFirstElement -> do_rpc(State1);
+ true -> State1
+ end.
+
+rpc_bottom_half(Reply, State = #state{rpc_requests = RequestQueue}) ->
+ {{value, {From, _Method, _Content}}, RequestQueue1} =
+ queue:out(RequestQueue),
+ case From of
+ none -> ok;
+ _ -> gen_server:reply(From, Reply)
+ end,
+ do_rpc(State#state{rpc_requests = RequestQueue1}).
+
+do_rpc(State = #state{rpc_requests = RequestQueue,
+ closing = Closing}) ->
+ case queue:peek(RequestQueue) of
+ {value, {_From, Method, Content}} ->
+ State1 = pre_do(Method, Content, State),
+ do(Method, Content, State1),
+ State1;
+ empty ->
+ case Closing of
+ {connection, Reason} -> self() ! {shutdown, Reason};
+ _ -> ok
+ end,
+ State
+ end.
+
+pre_do(#'channel.open'{}, _Content, State) ->
+ start_infrastructure(State);
+pre_do(#'channel.close'{}, _Content, State) ->
+ State#state{closing = just_channel};
+pre_do(_, _, State) ->
+ State.
+
+%%---------------------------------------------------------------------------
+%% Internal plumbing
+%%---------------------------------------------------------------------------
+
+do(Method, Content, #state{driver = Driver, writer = Writer}) ->
+ ok = amqp_channel_util:do(Driver, Writer, Method, Content).
+
+start_infrastructure(State = #state{start_infrastructure_fun = SIF}) ->
+ {ok, Writer} = SIF(),
+ State#state{writer = Writer}.
+
+resolve_consumer(_ConsumerTag, #state{consumers = []}) ->
+ exit(no_consumers_registered);
+resolve_consumer(ConsumerTag, #state{consumers = Consumers,
+ default_consumer = DefaultConsumer}) ->
+ case dict:find(ConsumerTag, Consumers) of
+ {ok, Value} ->
+ Value;
+ error ->
+ case is_pid(DefaultConsumer) of
+ true -> DefaultConsumer;
+ false -> exit(unexpected_delivery_and_no_default_consumer)
+ end
+ end.
+
+register_consumer(ConsumerTag, Consumer,
+ State = #state{consumers = Consumers0}) ->
+ Consumers1 = dict:store(ConsumerTag, Consumer, Consumers0),
+ State#state{consumers = Consumers1}.
+
+unregister_consumer(ConsumerTag,
+ State = #state{consumers = Consumers0}) ->
+ Consumers1 = dict:erase(ConsumerTag, Consumers0),
+ State#state{consumers = Consumers1}.
+
+amqp_msg(none) ->
+ none;
+amqp_msg(Content) ->
+ {Props, Payload} = rabbit_basic:from_content(Content),
+ #amqp_msg{props = Props, payload = Payload}.
+
+build_content(none) ->
+ none;
+build_content(#amqp_msg{props = Props, payload = Payload}) ->
+ rabbit_basic:build_content(Props, Payload).
+
+check_block(_Method, _AmqpMsg, #state{closing = just_channel}) ->
+ channel_closing;
+check_block(_Method, _AmqpMsg, #state{closing = {connection, _}}) ->
+ connection_closing;
+check_block(_Method, none, #state{}) ->
+ ok;
+check_block(_Method, _AmqpMsg, #state{flow_control = true}) ->
+ blocked;
+check_block(_Method, _AmqpMsg, #state{}) ->
+ ok.
+
+shutdown_with_reason({_, 200, _}, State) ->
+ {stop, normal, State};
+shutdown_with_reason(Reason, State) ->
+ {stop, Reason, State}.
+
+%%---------------------------------------------------------------------------
+%% Handling of methods from the server
+%%---------------------------------------------------------------------------
+
+handle_method(Method, Content, State = #state{closing = Closing}) ->
+ case {Method, Content} of
+ %% Handle 'channel.close': send 'channel.close_ok' and stop channel.
+ {#'channel.close'{reply_code = ReplyCode,
+ reply_text = ReplyText}, none} ->
+ do(#'channel.close_ok'{}, none, State),
+ {stop, {server_initiated_close, ReplyCode, ReplyText}, State};
+ %% Handle 'channel.close_ok': stop channel
+ {CloseOk = #'channel.close_ok'{}, none} ->
+ {stop, normal, rpc_bottom_half(CloseOk, State)};
+ _ ->
+ case Closing of
+ %% Drop all incomming traffic except 'channel.close' and
+ %% 'channel.close_ok' when channel is closing (has sent
+ %% 'channel.close')
+ just_channel ->
+ ?LOG_INFO("Channel (~p): dropping method ~p from server "
+ "because channel is closing~n",
+ [self(), {Method, Content}]),
+ {noreply, State};
+ %% Standard handling of incoming method
+ _ ->
+ handle_regular_method(Method, amqp_msg(Content), State)
+ end
+ end.
+
+handle_regular_method(
+ #'basic.consume_ok'{consumer_tag = ConsumerTag} = ConsumeOk, none,
+ #state{tagged_sub_requests = Tagged,
+ anon_sub_requests = Anon} = State) ->
+ {Consumer, State0} =
+ case dict:find(ConsumerTag, Tagged) of
+ {ok, C} ->
+ NewTagged = dict:erase(ConsumerTag, Tagged),
+ {C, State#state{tagged_sub_requests = NewTagged}};
+ error ->
+ {{value, C}, NewAnon} = queue:out(Anon),
+ {C, State#state{anon_sub_requests = NewAnon}}
+ end,
+ Consumer ! ConsumeOk,
+ State1 = register_consumer(ConsumerTag, Consumer, State0),
+ {noreply, rpc_bottom_half(ConsumeOk, State1)};
+
+handle_regular_method(
+ #'basic.cancel_ok'{consumer_tag = ConsumerTag} = CancelOk, none,
+ #state{} = State) ->
+ Consumer = resolve_consumer(ConsumerTag, State),
+ Consumer ! CancelOk,
+ NewState = unregister_consumer(ConsumerTag, State),
+ {noreply, rpc_bottom_half(CancelOk, NewState)};
+
+%% Handle 'channel.flow'
+%% If flow_control flag is defined, it informs the flow control handler to
+%% suspend submitting any content bearing methods
+handle_regular_method(#'channel.flow'{active = Active} = Flow, none,
+ #state{flow_handler_pid = FlowHandler} = State) ->
+ case FlowHandler of
+ none -> ok;
+ _ -> FlowHandler ! Flow
+ end,
+ do(#'channel.flow_ok'{active = Active}, none, State),
+ {noreply, State#state{flow_control = not(Active)}};
+
+handle_regular_method(#'basic.deliver'{consumer_tag = ConsumerTag} = Deliver,
+ AmqpMsg, State) ->
+ Consumer = resolve_consumer(ConsumerTag, State),
+ Consumer ! {Deliver, AmqpMsg},
+ {noreply, State};
+
+handle_regular_method(#'basic.return'{} = BasicReturn, AmqpMsg,
+ #state{return_handler_pid = ReturnHandler} = State) ->
+ case ReturnHandler of
+ none -> ?LOG_WARN("Channel (~p): received {~p, ~p} but there is no "
+ "return handler registered~n",
+ [self(), BasicReturn, AmqpMsg]);
+ _ -> ReturnHandler ! {BasicReturn, AmqpMsg}
+ end,
+ {noreply, State};
+
+handle_regular_method(Method, none, State) ->
+ {noreply, rpc_bottom_half(Method, State)};
+
+handle_regular_method(Method, Content, State) ->
+ {noreply, rpc_bottom_half({Method, Content}, State)}.
+
+%%---------------------------------------------------------------------------
+%% Internal interface
+%%---------------------------------------------------------------------------
+
+%% @private
+start_link(Driver, ChannelNumber, SIF) ->
+ gen_server:start_link(?MODULE, [self(), Driver, ChannelNumber, SIF], []).
+
+%%---------------------------------------------------------------------------
+%% gen_server callbacks
+%%---------------------------------------------------------------------------
+
+%% @private
+init([Sup, Driver, ChannelNumber, SIF]) ->
+ {ok, #state{sup = Sup,
+ driver = Driver,
+ number = ChannelNumber,
+ start_infrastructure_fun = SIF}}.
+
+%% Standard implementation of the call/{2,3} command
+%% @private
+handle_call({call, Method, AmqpMsg}, From, State) ->
+ case {Method, check_block(Method, AmqpMsg, State)} of
+ {#'basic.consume'{}, _} ->
+ {reply, {error, use_subscribe}, State};
+ {_, ok} ->
+ Content = build_content(AmqpMsg),
+ case ?PROTOCOL:is_method_synchronous(Method) of
+ true -> {noreply, rpc_top_half(Method, Content, From, State)};
+ false -> do(Method, Content, State),
+ {reply, ok, State}
+ end;
+ {_, BlockReply} ->
+ {reply, BlockReply, State}
+ end;
+
+%% Standard implementation of the subscribe/3 command
+%% @private
+handle_call({subscribe, #'basic.consume'{consumer_tag = Tag} = Method, Consumer},
+ From, #state{tagged_sub_requests = Tagged,
+ anon_sub_requests = Anon} = State) ->
+ case check_block(Method, none, State) of
+ ok ->
+ {NewMethod, NewState} =
+ if Tag =:= undefined orelse size(Tag) == 0 ->
+ NewAnon = queue:in(Consumer, Anon),
+ {Method#'basic.consume'{consumer_tag = <<"">>},
+ State#state{anon_sub_requests = NewAnon}};
+ is_binary(Tag) ->
+ %% TODO test whether this tag already exists, either in
+ %% the pending tagged request map or in general as
+ %% already subscribed consumer
+ NewTagged = dict:store(Tag, Consumer, Tagged),
+ {Method, State#state{tagged_sub_requests = NewTagged}}
+ end,
+ {noreply, rpc_top_half(NewMethod, none, From, NewState)};
+ BlockReply ->
+ {reply, BlockReply, State}
+ end;
+
+%% These handle the delivery of messages from a direct channel
+%% @private
+handle_call({send_command_sync, Method, Content}, From, State) ->
+ Ret = handle_method(Method, Content, State),
+ gen_server:reply(From, ok),
+ Ret;
+%% @private
+handle_call({send_command_sync, Method}, From, State) ->
+ Ret = handle_method(Method, none, State),
+ gen_server:reply(From, ok),
+ Ret.
+
+%% Standard implementation of the cast/{2,3} command
+%% @private
+handle_cast({cast, Method, AmqpMsg} = Cast, State) ->
+ case {Method, check_block(Method, AmqpMsg, State)} of
+ {#'basic.consume'{}, _} ->
+ ?LOG_WARN("Channel (~p): ignoring cast of ~p method. "
+ "Use subscribe/3 instead!~n", [self(), Method]),
+ {noreply, State};
+ {_, ok} ->
+ Content = build_content(AmqpMsg),
+ case ?PROTOCOL:is_method_synchronous(Method) of
+ true -> ?LOG_WARN("Channel (~p): casting synchronous method "
+ "~p.~n"
+ "The reply will be ignored!~n",
+ [self(), Method]),
+ {noreply, rpc_top_half(Method, Content, none, State)};
+ false -> do(Method, Content, State),
+ {noreply, State}
+ end;
+ {_, BlockReply} ->
+ ?LOG_WARN("Channel (~p): discarding method in cast ~p.~n"
+ "Reason: ~p~n", [self(), Cast, BlockReply]),
+ {noreply, State}
+ end;
+
+%% Registers a handler to process return messages
+%% @private
+handle_cast({register_return_handler, ReturnHandler}, State) ->
+ erlang:monitor(process, ReturnHandler),
+ {noreply, State#state{return_handler_pid = ReturnHandler}};
+
+%% Registers a handler to process flow control messages
+%% @private
+handle_cast({register_flow_handler, FlowHandler}, State) ->
+ erlang:monitor(process, FlowHandler),
+ {noreply, State#state{flow_handler_pid = FlowHandler}};
+
+%% Registers a handler to process unexpected deliveries
+%% @private
+handle_cast({register_default_consumer, Consumer}, State) ->
+ erlang:monitor(process, Consumer),
+ {noreply, State#state{default_consumer = Consumer}};
+
+%% @private
+handle_cast({notify_sent, _Peer}, State) ->
+ {noreply, State};
+
+%% This callback is invoked when a network channel sends messages
+%% to this gen_server instance
+%% @private
+handle_cast({method, Method, Content}, State) ->
+ handle_method(Method, Content, State).
+
+%% These callbacks are invoked when a direct channel sends messages
+%% to this gen_server instance
+%% @private
+handle_info({send_command, Method}, State) ->
+ handle_method(Method, none, State);
+%% @private
+handle_info({send_command, Method, Content}, State) ->
+ handle_method(Method, Content, State);
+
+%% This handles the delivery of a message from a direct channel
+%% @private
+handle_info({send_command_and_notify, Q, ChPid, Method, Content}, State) ->
+ handle_method(Method, Content, State),
+ rabbit_amqqueue:notify_sent(Q, ChPid),
+ {noreply, State};
+
+%% @private
+handle_info({shutdown, Reason}, State) ->
+ shutdown_with_reason(Reason, State);
+
+%% @private
+handle_info({shutdown, FailShutdownReason, InitialReason},
+ #state{number = Number} = State) ->
+ case FailShutdownReason of
+ {connection_closing, timed_out_flushing_channel} ->
+ ?LOG_WARN("Channel ~p closing: timed out flushing while connection "
+ "closing~n", [Number]);
+ {connection_closing, timed_out_waiting_close_ok} ->
+ ?LOG_WARN("Channel ~p closing: timed out waiting for "
+ "channel.close_ok while connection closing~n", [Number])
+ end,
+ {stop, {FailShutdownReason, InitialReason}, State};
+
+%% Handles the situation when the connection closes without closing the channel
+%% beforehand. The channel must block all further RPCs,
+%% flush the RPC queue (optional), and terminate
+%% @private
+handle_info({connection_closing, CloseType, Reason},
+ #state{rpc_requests = RpcQueue,
+ closing = Closing} = State) ->
+ case {CloseType, Closing, queue:is_empty(RpcQueue)} of
+ {flush, false, false} ->
+ erlang:send_after(?TIMEOUT_FLUSH, self(),
+ {shutdown,
+ {connection_closing, timed_out_flushing_channel},
+ Reason}),
+ {noreply, State#state{closing = {connection, Reason}}};
+ {flush, just_channel, false} ->
+ erlang:send_after(?TIMEOUT_CLOSE_OK, self(),
+ {shutdown,
+ {connection_closing, timed_out_waiting_close_ok},
+ Reason}),
+ {noreply, State};
+ _ ->
+ shutdown_with_reason(Reason, State)
+ end;
+
+%% This is for a channel exception that is sent by the direct
+%% rabbit_channel process
+%% @private
+handle_info({channel_exit, _Channel, #amqp_error{name = ErrorName,
+ explanation = Expl} = Error},
+ State = #state{number = Number}) ->
+ ?LOG_WARN("Channel ~p closing: server sent error ~p~n", [Number, Error]),
+ {_, Code, _} = ?PROTOCOL:lookup_amqp_exception(ErrorName),
+ {stop, {server_initiated_close, Code, Expl}, State};
+
+%% @private
+handle_info({'DOWN', _, process, Pid, Reason}, State) ->
+ handle_down(Pid, Reason, State).
+
+handle_down(ReturnHandler, Reason,
+ State = #state{return_handler_pid = ReturnHandler}) ->
+ ?LOG_WARN("Channel (~p): Unregistering return handler ~p because it died. "
+ "Reason: ~p~n", [self(), ReturnHandler, Reason]),
+ {noreply, State#state{return_handler_pid = none}};
+handle_down(FlowHandler, Reason,
+ State = #state{flow_handler_pid = FlowHandler}) ->
+ ?LOG_WARN("Channel (~p): Unregistering flow handler ~p because it died. "
+ "Reason: ~p~n", [self(), FlowHandler, Reason]),
+ {noreply, State#state{flow_handler_pid = none}};
+handle_down(DefaultConsumer, Reason,
+ State = #state{default_consumer = DefaultConsumer}) ->
+ ?LOG_WARN("Channel (~p): Unregistering default consumer ~p because it died."
+ "Reason: ~p~n", [self(), DefaultConsumer, Reason]),
+ {noreply, State#state{default_consumer = none}};
+handle_down(Other, Reason, State) ->
+ {stop, {unexpected_down, Other, Reason}, State}.
+
+%%---------------------------------------------------------------------------
+%% Rest of the gen_server callbacks
+%%---------------------------------------------------------------------------
+
+%% @private
+terminate(_Reason, _State) ->
+ ok.
+
+%% @private
+code_change(_OldVsn, State, _Extra) ->
+ State.
99 src/amqp_channel_sup.erl
@@ -0,0 +1,99 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is the RabbitMQ Erlang Client.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd.,
+%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd., Cohesive Financial
+%% Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
+%% 2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
+%% Technologies Ltd.;
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ____________________.
+
+%% @private
+-module(amqp_channel_sup).
+
+-include("amqp_client.hrl").
+
+-behaviour(supervisor2).
+
+-export([start_link/3]).
+-export([init/1]).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+start_link(Driver, InfraArgs, ChNumber) ->
+ {ok, Sup} = supervisor2:start_link(?MODULE, []),
+ SIF = start_infrastructure_fun(Sup, Driver, InfraArgs, ChNumber),
+ {ok, ChPid} =
+ supervisor2:start_child(
+ Sup, {channel, {amqp_channel, start_link, [Driver, ChNumber, SIF]},
+ intrinsic, brutal_kill, worker, [amqp_channel]}),
+ {ok, Sup, ChPid}.
+
+%%---------------------------------------------------------------------------
+%% Internal plumbing
+%%---------------------------------------------------------------------------
+
+start_infrastructure_fun(Sup, direct, [User, VHost, Collector], ChNumber) ->
+ fun () ->
+ ChPid = self(),
+ supervisor2:start_child(
+ Sup,
+ {rabbit_channel, {rabbit_channel, start_link,
+ [ChNumber, ChPid, ChPid, User, VHost,
+ Collector, start_limiter_fun(Sup)]},
+ transient, ?MAX_WAIT, worker, [rabbit_channel]})
+ end;
+start_infrastructure_fun(Sup, network, [Sock, MainReader], ChNumber) ->
+ fun () ->
+ ChPid = self(),
+ {ok, Framing} =
+ supervisor2:start_child(
+ Sup,
+ {framing, {rabbit_framing_channel, start_link,
+ [Sup, ChPid, ?PROTOCOL]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}),
+ {ok, Writer} =
+ supervisor2:start_child(
+ Sup,
+ {writer, {rabbit_writer, start_link,
+ [Sock, ChNumber, ?FRAME_MIN_SIZE, ?PROTOCOL,
+ MainReader]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}),
+ %% This call will disappear as part of bug 23024
+ amqp_main_reader:register_framing_channel(MainReader, ChNumber,
+ Framing),
+ {ok, Writer}
+ end.
+
+start_limiter_fun(Sup) ->
+ fun (UnackedCount) ->
+ Parent = self(),
+ {ok, _} = supervisor2:start_child(
+ Sup,
+ {limiter, {rabbit_limiter, start_link,
+ [Parent, UnackedCount]},
+ transient, ?MAX_WAIT, worker, [rabbit_limiter]})
+ end.
+
+%%---------------------------------------------------------------------------
+%% supervisor2 callbacks
+%%---------------------------------------------------------------------------
+
+init([]) ->
+ {ok, {{one_for_all, 0, 1}, []}}.
52 src/amqp_channel_sup_sup.erl
@@ -0,0 +1,52 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is the RabbitMQ Erlang Client.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd.,
+%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd., Cohesive Financial
+%% Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
+%% 2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
+%% Technologies Ltd.;
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ____________________.
+
+%% @private
+-module(amqp_channel_sup_sup).
+
+-include("amqp_client.hrl").
+
+-behaviour(supervisor2).
+
+-export([start_link/1, start_channel_sup/3]).
+-export([init/1]).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+start_link(Driver) ->
+ supervisor2:start_link(?MODULE, [Driver]).
+
+start_channel_sup(Sup, InfraArgs, ChannelNumber) ->
+ supervisor2:start_child(Sup, [InfraArgs, ChannelNumber]).
+
+%%---------------------------------------------------------------------------
+%% supervisor2 callbacks
+%%---------------------------------------------------------------------------
+
+init([Driver]) ->
+ {ok, {{simple_one_for_one, 0, 1},
+ [{channel_sup, {amqp_channel_sup, start_link, [Driver]},
+ temporary, brutal_kill, supervisor, [amqp_channel_sup]}]}}.
215 src/amqp_channel_util.erl
@@ -0,0 +1,215 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is the RabbitMQ Erlang Client.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd.,
+%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd., Cohesive Financial
+%% Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
+%% 2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
+%% Technologies Ltd.;
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): Ben Hood <0x6e6562@gmail.com>.
+
+%% @private
+-module(amqp_channel_util).
+
+-include("amqp_client.hrl").
+
+-export([open_channel/5]).
+-export([do/4]).
+-export([new_channel_dict/0, is_channel_dict_empty/1, num_channels/1,
+ register_channel/3, unregister_channel_number/2,
+ unregister_channel_pid/2, resolve_channel_number/2,
+ resolve_channel_pid/2, is_channel_number_registered/2,
+ is_channel_pid_registered/2, channel_number/3]).
+-export([broadcast_to_channels/2, handle_exit/4]).
+
+%%---------------------------------------------------------------------------
+%% Opening channels
+%%---------------------------------------------------------------------------
+
+open_channel(ChSupSup, ProposedNumber, MaxChannel, InfraArgs, Channels) ->
+ ChannelNumber = channel_number(ProposedNumber, Channels, MaxChannel),
+ {ok, _ChannelSup, ChPid} = amqp_channel_sup_sup:start_channel_sup(
+ ChSupSup, InfraArgs, ChannelNumber),
+ #'channel.open_ok'{} = amqp_channel:call(ChPid, #'channel.open'{}),
+ erlang:monitor(process, ChPid),
+ NewChannels = register_channel(ChannelNumber, ChPid, Channels),
+ {ChPid, NewChannels}.
+
+%%---------------------------------------------------------------------------
+%% Do
+%%---------------------------------------------------------------------------
+
+do(network, Writer, Method, Content) ->
+ case Content of
+ none -> rabbit_writer:send_command_sync(Writer, Method);
+ _ -> rabbit_writer:send_command_sync(Writer, Method, Content)
+ end;
+do(direct, RabbitChannel, Method, Content) ->
+ case Content of
+ none -> rabbit_channel:do(RabbitChannel, Method);
+ _ -> rabbit_channel:do(RabbitChannel, Method, Content)
+ end.
+
+%%---------------------------------------------------------------------------
+%% Channel number/pid registration
+%%---------------------------------------------------------------------------
+
+%% New channel dictionary for keeping track of the mapping and reverse mapping
+%% between the channel pid's and the channel numbers
+new_channel_dict() ->
+ {gb_trees:empty(), dict:new()}.
+
+%% Returns true iff there are no channels currently registered in the given
+%% dictionary
+is_channel_dict_empty(_Channels = {TreeNP, _}) ->
+ gb_trees:is_empty(TreeNP).
+
+%% Returns the number of channels registered in the channels dictionary
+num_channels(_Channels = {TreeNP, _}) ->
+ gb_trees:size(TreeNP).
+
+%% Register a channel in a given channel dictionary
+register_channel(Number, Pid, _Channels = {TreeNP, DictPN}) ->
+ case gb_trees:is_defined(Number, TreeNP) of
+ false ->
+ TreeNP1 = gb_trees:enter(Number, Pid, TreeNP),
+ DictPN1 = dict:store(Pid, Number, DictPN),
+ {TreeNP1, DictPN1};
+ true ->
+ erlang:error({channel_already_registered, Number})
+ end.
+
+%% Unregister a channel given its number
+unregister_channel_number(Number, Channels = {TreeNP, _}) ->
+ case gb_trees:lookup(Number, TreeNP) of
+ {value, Pid} -> unregister_channel(Number, Pid, Channels);
+ none -> erlang:error(channel_number_not_registered)
+ end.
+
+%% Unregister a channel given its pid
+unregister_channel_pid(Pid, Channels = {_, DictPN}) ->
+ case dict:fetch(Pid, DictPN) of
+ undefined -> erlang:error(channel_pid_not_registered);
+ Number -> unregister_channel(Number, Pid, Channels)
+ end.
+
+unregister_channel(Number, Pid, {TreeNP, DictPN}) ->
+ TreeNP1 = gb_trees:delete(Number, TreeNP),
+ DictPN1 = dict:erase(Pid, DictPN),
+ {TreeNP1, DictPN1}.
+
+%% Get channel pid, given its number. Returns undefined if channel number
+%% is not registered.
+resolve_channel_number(Number, _Channels = {TreeNP, _}) ->
+ case gb_trees:lookup(Number, TreeNP) of
+ {value, Pid} -> Pid;
+ none -> undefined
+ end.
+
+%% Get channel number, given its pid. Assumes pid is registered
+resolve_channel_pid(Pid, _Channels = {_, DictPN}) ->
+ dict:fetch(Pid, DictPN).
+
+%% Returns true iff channel number is registered in the given channel
+%% dictionary
+is_channel_number_registered(Number, _Channels = {TreeNP, _}) ->
+ gb_trees:is_defined(Number, TreeNP).
+
+%% Returns true iff pid is registered in the given channel dictionary
+is_channel_pid_registered(Pid, _Channels = {_, DictPN}) ->
+ dict:is_key(Pid, DictPN).
+
+%% Returns an available channel number in the given channel dictionary
+channel_number(none, Channels, 0) ->
+ channel_number(none, Channels, ?MAX_CHANNEL_NUMBER);
+channel_number(none, _Channels = {TreeNP, _}, MaxChannel) ->
+ case gb_trees:is_empty(TreeNP) of
+ true ->
+ 1;
+ false ->
+ {Smallest, _} = gb_trees:smallest(TreeNP),
+ if Smallest > 1 ->
+ Smallest - 1;
+ true ->
+ {Largest, _} = gb_trees:largest(TreeNP),
+ if Largest < MaxChannel ->
+ Largest + 1;
+ true ->
+ find_available_number(gb_trees:iterator(TreeNP), 1)
+ end
+ end
+ end;
+channel_number(ProposedNumber, Channels, 0) ->
+ channel_number(ProposedNumber, Channels, ?MAX_CHANNEL_NUMBER);
+channel_number(ProposedNumber, Channels, MaxChannel) ->
+ IsNumberValid = ProposedNumber > 0 andalso
+ ProposedNumber =< MaxChannel andalso
+ not is_channel_number_registered(ProposedNumber, Channels),
+ if IsNumberValid -> ProposedNumber;
+ true -> channel_number(none, Channels, MaxChannel)
+ end.
+
+find_available_number(It, Candidate) ->
+ case gb_trees:next(It) of
+ {Number, _, It1} ->
+ if Number > Candidate -> Number - 1;
+ Number =:= Candidate -> find_available_number(It1, Candidate + 1);
+ true -> erlang:error(unexpected_structure)
+ end;
+ none ->
+ erlang:error(out_of_channel_numbers)
+ end.
+
+%%---------------------------------------------------------------------------
+%% Other channel utilities
+%%---------------------------------------------------------------------------
+
+broadcast_to_channels(Message, _Channels = {_, DictPN}) ->
+ dict:map(fun(ChannelPid, _) -> ChannelPid ! Message, ok end, DictPN),
+ ok.
+
+handle_exit(Pid, Reason, Channels, Closing) ->
+ case is_channel_pid_registered(Pid, Channels) of
+ true -> handle_channel_exit(Pid, Reason, Closing);
+ false -> ?LOG_WARN("Connection (~p) closing: received unexpected "
+ "down signal from (~p). Reason: ~p~n",
+ [self(), Pid, Reason]),
+ other
+ end.
+
+handle_channel_exit(_Pid, normal, _Closing) ->
+ normal;
+handle_channel_exit(Pid, {server_initiated_close, Code, _Text}, false) ->
+ %% Channel terminating (server sent 'channel.close')
+ {IsHardError, _, _} = ?PROTOCOL:lookup_amqp_exception(
+ ?PROTOCOL:amqp_exception(Code)),
+ case IsHardError of
+ true -> ?LOG_WARN("Connection (~p) closing: channel (~p) "
+ "received hard error from server~n", [self(), Pid]),
+ stop;
+ false -> normal
+ end;
+handle_channel_exit(_Pid, {_CloseReason, _Code, _Text}, Closing)
+ when Closing =/= false ->
+ %% Channel terminating due to connection closing
+ normal;
+handle_channel_exit(Pid, Reason, _Closing) ->
+ %% amqp_channel dies with internal reason - this takes
+ %% the entire connection down
+ ?LOG_WARN("Connection (~p) closing: channel (~p) died. Reason: ~p~n",
+ [self(), Pid, Reason]),
+ close.
16 src/amqp_client.app.src
@@ -0,0 +1,16 @@
+{application, amqp_client,
+ [{description, "RabbitMQ AMQP Client"},
+ {vsn, "0.0.0"},
+ {modules, [
+ amqp_channel,
+ amqp_channel_util,
+ amqp_connection,
+ amqp_direct_connection,
+ amqp_main_reader,
+ amqp_network_connection,
+ amqp_rpc_client,
+ amqp_rpc_server
+ ]},
+ {registered, []},
+ {env, []},
+ {applications, [kernel, stdlib]}]}.
217 src/amqp_connection.erl
@@ -0,0 +1,217 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is the RabbitMQ Erlang Client.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd.,
+%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd., Cohesive Financial
+%% Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
+%% 2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
+%% Technologies Ltd.;
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): Ben Hood <0x6e6562@gmail.com>.
+
+%% @doc This module is responsible for maintaining a connection to an AMQP
+%% broker and manages channels within the connection. This module is used to
+%% open and close connections to the broker as well as creating new channels
+%% within a connection.<br/>
+%% The connections and channels created by this module are supervised under
+%% amqp_client's supervision tree. Please note that connections and channels
+%% do not get restarted automatically by the supervision tree in the case of a
+%% failure. If you need robust connections and channels, we recommend you use
+%% Erlang monitors on the returned connection and channel PID.
+-module(amqp_connection).
+
+-include("amqp_client.hrl").
+
+-export([open_channel/1, open_channel/2]).
+-export([start/1, start/2]).
+-export([close/1, close/3]).
+-export([info/2, info_keys/1, info_keys/0]).
+
+-define(COMMON_INFO_KEYS,
+ [server_properties, is_closing, amqp_params, num_channels]).
+
+%%---------------------------------------------------------------------------
+%% Type Definitions
+%%---------------------------------------------------------------------------
+
+%% @type amqp_params() = #amqp_params{}.
+%% As defined in amqp_client.hrl. It contains the following fields:
+%% <ul>
+%% <li>username :: binary() - The name of a user registered with the broker,
+%% defaults to &lt;&lt;guest"&gt;&gt;</li>
+%% <li>password :: binary() - The user's password, defaults to
+%% &lt;&lt;"guest"&gt;&gt;</li>
+%% <li>virtual_host :: binary() - The name of a virtual host in the broker,
+%% defaults to &lt;&lt;"/"&gt;&gt;</li>
+%% <li>host :: string() - The hostname of the broker,
+%% defaults to "localhost"</li>
+%% <li>port :: integer() - The port the broker is listening on,
+%% defaults to 5672</li>
+%% <li>channel_max :: non_neg_integer() - The channel_max handshake parameter,
+%% defaults to 0</li>
+%% <li>frame_max :: non_neg_integer() - The frame_max handshake parameter,
+%% defaults to 0</li>
+%% <li>heartbeat :: non_neg_integer() - The hearbeat interval in seconds,
+%% defaults to 0 (turned off)</li>
+%% <li>ssl_options :: term() - The second parameter to be used with the
+%% ssl:connect/2 function, defaults to 'none'</li>
+%% <li>client_properties :: [{binary(), atom(), binary()}] - A list of extra
+%% client properties to be sent to the server, defaults to []</li>
+%% </ul>
+
+%%---------------------------------------------------------------------------
+%% Starting a connection
+%%---------------------------------------------------------------------------
+
+%% @spec (Type) -> {ok, Connection} | {error, Error}
+%% where
+%% Type = network | direct
+%% Connection = pid()
+%% @doc Starts a connection to an AMQP server. Use network type to connect
+%% to a remote AMQP server - default connection settings are used, meaning that
+%% the server is expected to be at localhost:5672, with a vhost of "/"
+%% authorising a user guest/guest. Use direct type for a direct connection to
+%% a RabbitMQ server, assuming that the server is running in the same process
+%% space, and with a default set of amqp_params. If a different host, port,
+%% vhost or credential set is required, start/2 should be used.
+start(Type) ->
+ start(Type, #amqp_params{}).
+
+%% @spec (Type, amqp_params()) -> {ok, Connection} | {error, Error}
+%% where
+%% Type = network | direct
+%% Connection = pid()
+%% @doc Starts a connection to an AMQP server. Use network type to connect
+%% to a remote AMQP server or direct type for a direct connection to
+%% a RabbitMQ server, assuming that the server is running in the same process
+%% space.
+start(Type, AmqpParams) ->
+ {ok, _Sup, Connection} = amqp_connection_sup:start_link(Type, AmqpParams),
+ Module = case Type of direct -> amqp_direct_connection;
+ network -> amqp_network_connection
+ end,
+ try Module:connect(Connection) of
+ ok -> {ok, Connection}
+ catch
+ exit:{Reason = {protocol_version_mismatch, _, _}, _} ->
+ {error, Reason};
+ exit:Reason ->
+ {error, {auth_failure_likely, Reason}}
+ end.
+
+%%---------------------------------------------------------------------------
+%% Commands
+%%---------------------------------------------------------------------------
+
+%% @doc Invokes open_channel(ConnectionPid, none).
+%% Opens a channel without having to specify a channel number.
+open_channel(ConnectionPid) ->
+ open_channel(ConnectionPid, none).
+
+%% @spec (ConnectionPid, ChannelNumber) -> {ok, ChannelPid} | {error, Error}
+%% where
+%% ChannelNumber = pos_integer() | 'none'
+%% ConnectionPid = pid()
+%% ChannelPid = pid()
+%% @doc Opens an AMQP channel.<br/>
+%% This function assumes that an AMQP connection (networked or direct)
+%% has already been successfully established.<br/>
+%% ChannelNumber must be less than or equal to the negotiated max_channel value,
+%% or less than or equal to ?MAX_CHANNEL_NUMBER if the negotiated max_channel
+%% value is 0.<br/>
+%% In the direct connection, max_channel is always 0.
+open_channel(ConnectionPid, ChannelNumber) ->
+ command(ConnectionPid, {open_channel, ChannelNumber}).
+
+%% @doc Invokes close(ConnectionPid, 200, &lt;&lt;"Goodbye"&gt;&gt;).
+close(ConnectionPid) ->
+ close(ConnectionPid, 200, <<"Goodbye">>).
+
+%% @spec (ConnectionPid, Code, Text) -> ok | closing
+%% where
+%% ConnectionPid = pid()
+%% Code = integer()
+%% Text = binary()
+%% @doc Closes the AMQP connection, allowing the caller to set the reply
+%% code and text.
+close(ConnectionPid, Code, Text) ->
+ Close = #'connection.close'{reply_text = Text,
+ reply_code = Code,
+ class_id = 0,
+ method_id = 0},
+ command(ConnectionPid, {close, Close}).
+
+%%---------------------------------------------------------------------------
+%% Other functions
+%%---------------------------------------------------------------------------
+
+%% @spec (ConnectionPid, Items) -> ResultList
+%% where
+%% ConnectionPid = pid()
+%% Items = [Item]
+%% ResultList = [{Item, Result}]
+%% Item = atom()
+%% Result = term()
+%% @doc Returns information about the connection, as specified by the Items
+%% list. Item may be any atom returned by info_keys/1:
+%% server_properties - returns the server_properties fiels sent by the
+%% server while establishing the connection
+%% is_closing - returns true if the connection is in the process of closing
+%% and false otherwise
+%% amqp_params - returns the #amqp_params{} structure used to start the
+%% connection
+%% num_channels - returns the number of channels currently open under the
+%% connection (excluding channel 0)
+%% max_channel - returns the max_channel value negotiated with the server
+%% (only for the network connection)
+%% heartbeat - returns the heartbeat value negotiated with the server
+%% (only for the network connection)
+%% sock - returns the socket for the network connection (for use with
+%% e.g. inet:sockname/1)
+%% (only for the network connection)
+%% any other value - throws an exception
+info(ConnectionPid, Items) ->
+ gen_server:call(ConnectionPid, {info, Items}, infinity).
+
+%% @spec (ConnectionPid) -> Items
+%% where
+%% ConnectionPid = pid()
+%% Items = [Item]
+%% Item = atom()
+%% @doc Returns a list of atoms that can be used in conjunction with info/2.
+%% Note that the list differs from a type of connection to another (network vs.
+%% direct). Use info_keys/0 to get a list of info keys that can be used for
+%% any connection.
+info_keys(ConnectionPid) ->
+ gen_server:call(ConnectionPid, info_keys, infinity).
+
+%% @spec () -> Items
+%% where
+%% Items = [Item]
+%% Item = atom()
+%% @doc Returns a list of atoms that can be used in conjunction with info/2.
+%% These are general info keys, which can be used in any type of connection.
+%% Other info keys may exist for a specific type. To get the full list of
+%% atoms that can be used for a certain connection, use info_keys/1.
+info_keys() ->
+ ?COMMON_INFO_KEYS.
+
+%%---------------------------------------------------------------------------
+%% Internal plumbing
+%%---------------------------------------------------------------------------
+
+command(ConnectionPid, Command) ->
+ gen_server:call(ConnectionPid, {command, Command}, infinity).
99 src/amqp_connection_sup.erl
@@ -0,0 +1,99 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is the RabbitMQ Erlang Client.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd.,
+%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd., Cohesive Financial
+%% Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
+%% 2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
+%% Technologies Ltd.;
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ____________________.
+
+%% @private
+-module(amqp_connection_sup).
+
+-include("amqp_client.hrl").
+
+-behaviour(supervisor2).
+
+-export([start_link/2]).
+-export([init/1]).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+start_link(Type, AmqpParams) ->
+ {ok, Sup} = supervisor2:start_link(?MODULE, []),
+ unlink(Sup),
+ {ok, ChSupSup} = supervisor2:start_child(Sup,
+ {channel_sup_sup, {amqp_channel_sup_sup, start_link,
+ [Type]},
+ intrinsic, infinity, supervisor,
+ [amqp_channel_sup_sup]}),
+ {ok, Connection} = start_connection(Sup, Type, AmqpParams, ChSupSup,
+ start_infrastructure_fun(Sup, Type)),
+ {ok, Sup, Connection}.
+
+%%---------------------------------------------------------------------------
+%% Internal plumbing
+%%---------------------------------------------------------------------------
+
+start_connection(Sup, network, AmqpParams, ChSupSup, SIF) ->
+ {ok, _} = supervisor2:start_child(
+ Sup,
+ {connection, {amqp_network_connection, start_link,
+ [AmqpParams, ChSupSup, SIF]},
+ intrinsic, brutal_kill, worker, [amqp_network_connection]});
+start_connection(Sup, direct, AmqpParams, ChSupSup, SIF) ->
+ {ok, _} = supervisor2:start_child(
+ Sup,
+ {connection, {amqp_direct_connection, start_link,
+ [AmqpParams, ChSupSup, SIF]},
+ intrinsic, brutal_kill, worker, [amqp_direct_connection]}).
+
+start_infrastructure_fun(Sup, network) ->
+ fun (Sock) ->
+ Connection = self(),
+ {ok, CTSup, {MainReader, Framing, Writer}} =
+ supervisor2:start_child(
+ Sup,
+ {connection_type_sup,
+ {amqp_connection_type_sup, start_link_network,
+ [Sock, Connection]},
+ intrinsic, infinity, supervisor,
+ [amqp_connection_type_sup]}),
+ {ok, {MainReader, Framing, Writer,
+ amqp_connection_type_sup:start_heartbeat_fun(CTSup)}}
+ end;
+start_infrastructure_fun(Sup, direct) ->
+ fun () ->
+ {ok, _CTSup, Collector} =
+ supervisor2:start_child(
+ Sup,
+ {connection_type_sup,
+ {amqp_connection_type_sup, start_link_direct, []},
+ intrinsic, infinity, supervisor,
+ [amqp_connection_type_sup]}),
+ {ok, Collector}
+ end.
+
+%%---------------------------------------------------------------------------
+%% supervisor2 callbacks
+%%---------------------------------------------------------------------------
+
+init([]) ->
+ {ok, {{one_for_all, 0, 1}, []}}.
92 src/amqp_connection_type_sup.erl
@@ -0,0 +1,92 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is the RabbitMQ Erlang Client.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd.,
+%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd., Cohesive Financial
+%% Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
+%% 2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
+%% Technologies Ltd.;
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): Ben Hood <0x6e6562@gmail.com>.
+
+%% @private
+-module(amqp_connection_type_sup).
+
+-include("amqp_client.hrl").
+
+-behaviour(supervisor2).
+
+-export([start_link_direct/0, start_link_network/2, start_heartbeat_fun/1]).
+-export([init/1]).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+start_link_direct() ->
+ {ok, Sup} = supervisor2:start_link(?MODULE, []),
+ {ok, Collector} =
+ supervisor2:start_child(
+ Sup, {collector, {rabbit_queue_collector, start_link, []},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
+ {ok, Sup, Collector}.
+
+start_link_network(Sock, ConnectionPid) ->
+ {ok, Sup} = supervisor2:start_link(?MODULE, []),
+ {ok, Framing} =
+ supervisor2:start_child(
+ Sup, {framing, {rabbit_framing_channel, start_link,
+ [Sup, ConnectionPid, ?PROTOCOL]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_framing_channel]}),
+ {ok, MainReader} =
+ supervisor2:start_child(
+ Sup, {main_reader, {amqp_main_reader, start_link,
+ [Sock, Framing, ConnectionPid]},
+ intrinsic, ?MAX_WAIT, worker, [amqp_main_reader]}),
+ {ok, Writer} =
+ supervisor2:start_child(
+ Sup, {writer, {rabbit_writer, start_link,
+ [Sock, 0, ?FRAME_MIN_SIZE, ?PROTOCOL, MainReader]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}),
+ {ok, Sup, {MainReader, Framing, Writer}}.
+
+start_heartbeat_fun(Sup) ->
+ fun (_Sock, 0) ->
+ none;
+ (Sock, Timeout) ->
+ Connection = self(),
+ {ok, Sender} =
+ supervisor2:start_child(
+ Sup,
+ {heartbeat_sender, {rabbit_heartbeat, start_heartbeat_sender,
+ [Connection, Sock, Timeout]},
+ transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
+ {ok, Receiver} =
+ supervisor2:start_child(
+ Sup,
+ {heartbeat_receiver,
+ {rabbit_heartbeat, start_heartbeat_receiver,
+ [Connection, Sock, Timeout]},
+ transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
+ {Sender, Receiver}
+ end.
+
+%%---------------------------------------------------------------------------
+%% supervisor2 callbacks
+%%---------------------------------------------------------------------------
+
+init([]) ->
+ {ok, {{one_for_all, 0, 1}, []}}.
258 src/amqp_direct_connection.erl
@@ -0,0 +1,258 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is the RabbitMQ Erlang Client.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd.,
+%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd., Cohesive Financial
+%% Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
+%% 2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
+%% Technologies Ltd.;
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): Ben Hood <0x6e6562@gmail.com>.
+
+%% @private
+-module(amqp_direct_connection).
+
+-include("amqp_client.hrl").
+
+-behaviour(gen_server).
+
+-export([start_link/3, connect/1]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2]).
+
+-record(state, {sup,
+ params = #amqp_params{},
+ collector,
+ closing = false,
+ server_properties,
+ channel_sup_sup,
+ channels = amqp_channel_util:new_channel_dict(),
+ start_infrastructure_fun}).
+
+-record(closing, {reason,
+ close = none, %% At least one of close and reply has to be
+ reply = none, %% none at any given moment
+ from = none}).
+
+-define(INFO_KEYS,
+ (amqp_connection:info_keys() ++ [])).
+
+%%---------------------------------------------------------------------------
+%% Internal interface
+%%---------------------------------------------------------------------------
+
+start_link(AmqpParams, ChSupSup, SIF) ->
+ gen_server:start_link(?MODULE, [self(), AmqpParams, ChSupSup, SIF], []).
+
+connect(Pid) ->
+ gen_server:call(Pid, connect, infinity).
+
+%%---------------------------------------------------------------------------
+%% gen_server callbacks
+%%---------------------------------------------------------------------------
+
+init([Sup, AmqpParams, ChSupSup, SIF]) ->
+ {ok, #state{sup = Sup,
+ params = AmqpParams,
+ channel_sup_sup = ChSupSup,
+ start_infrastructure_fun = SIF}}.
+
+handle_call({command, Command}, From, #state{closing = Closing} = State) ->
+ case Closing of
+ false -> handle_command(Command, From, State);
+ _ -> {reply, closing, State}
+ end;
+handle_call({info, Items}, _From, State) ->
+ {reply, [{Item, i(Item, State)} || Item <- Items], State};
+handle_call(info_keys, _From, State) ->
+ {reply, ?INFO_KEYS, State};
+handle_call(connect, _From, State) ->
+ {reply, ok, do_connect(State)}.
+
+handle_cast(Message, State) ->
+ ?LOG_WARN("Connection (~p) closing: received unexpected cast ~p~n",
+ [self(), Message]),
+ {noreply, set_closing_state(abrupt, internal_error_closing(), State)}.
+
+handle_info({shutdown, Reason}, State) ->
+ {_, Code, _} = Reason,
+ if Code =:= 200 -> {stop, normal, State};
+ true -> {stop, Reason, State}
+ end;
+handle_info({'DOWN', _, process, Pid, Reason}, State) ->
+ handle_channel_exit(Pid, Reason, State).
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ State.
+
+%%---------------------------------------------------------------------------
+%% Command handling
+%%---------------------------------------------------------------------------
+
+handle_command({open_channel, ProposedNumber}, _From, State =
+ #state{collector = Collector,
+ channel_sup_sup = ChSupSup,
+ params = #amqp_params{username = User,
+ virtual_host = VHost},
+ channels = Channels}) ->
+ try amqp_channel_util:open_channel(ChSupSup, ProposedNumber,
+ ?MAX_CHANNEL_NUMBER,
+ [User, VHost, Collector], Channels) of
+ {ChannelPid, NewChannels} ->
+ {reply, {ok, ChannelPid}, State#state{channels = NewChannels}}
+ catch
+ error:out_of_channel_numbers = Error ->
+ {reply, {error, {Error, ?MAX_CHANNEL_NUMBER}}, State}
+ end;
+
+handle_command({close, Close}, From, State) ->
+ {noreply, set_closing_state(flush, #closing{reason = app_initiated_close,
+ close = Close,
+ from = From},
+ State)}.
+
+%%---------------------------------------------------------------------------
+%% Infos
+%%---------------------------------------------------------------------------
+
+i(server_properties, State) -> State#state.server_properties;
+i(is_closing, State) -> State#state.closing =/= false;
+i(amqp_params, State) -> State#state.params;
+i(num_channels, State) -> amqp_channel_util:num_channels(
+ State#state.channels);
+i(Item, _State) -> throw({bad_argument, Item}).
+
+%%---------------------------------------------------------------------------
+%% Closing
+%%---------------------------------------------------------------------------
+
+%% Changes connection's state to closing.
+%%
+%% ChannelCloseType can be flush or abrupt
+%%
+%% The precedence of the closing MainReason's is as follows:
+%% app_initiated_close, internal_error, server_initiated_close
+%% (i.e.: a given reason can override the currently set one if it is later
+%% mentioned in the above list). We can rely on erlang's comparison of atoms
+%% for this.
+set_closing_state(ChannelCloseType, Closing,
+ State = #state{closing = false, channels = Channels}) ->
+ amqp_channel_util:broadcast_to_channels(
+ {connection_closing, ChannelCloseType, closing_to_reason(Closing)},
+ Channels),
+ check_trigger_all_channels_closed_event(State#state{closing = Closing});
+%% Already closing, override situation
+set_closing_state(ChannelCloseType, NewClosing,
+ State = #state{closing = CurClosing, channels = Channels}) ->
+ %% Do not override reason in channels (because it might cause channels to
+ %% to exit with different reasons) but do cause them to close abruptly
+ %% if the new closing type requires it
+ case ChannelCloseType of
+ abrupt ->
+ amqp_channel_util:broadcast_to_channels(
+ {connection_closing, ChannelCloseType,
+ closing_to_reason(CurClosing)},
+ Channels);
+ _ -> ok
+ end,
+ ResClosing =
+ if
+ %% Override (rely on erlang's comparison of atoms)
+ NewClosing#closing.reason >= CurClosing#closing.reason ->
+ NewClosing;
+ %% Do not override
+ true ->
+ CurClosing
+ end,
+ State#state{closing = ResClosing}.
+
+%% The all_channels_closed_event is called when all channels have been closed
+%% after the connection broadcasts a connection_closing message to all channels
+all_channels_closed_event(State = #state{closing = Closing,
+ collector = Collector}) ->
+ rabbit_queue_collector:delete_all(Collector),
+ case Closing#closing.from of
+ none -> ok;
+ From -> gen_server:reply(From, ok)
+ end,
+ self() ! {shutdown, closing_to_reason(Closing)},
+ State.
+
+closing_to_reason(#closing{reason = Reason,
+ close = #'connection.close'{reply_code = Code,
+ reply_text = Text},
+ reply = none}) ->
+ {Reason, Code, Text};
+closing_to_reason(#closing{reason = Reason,
+ reply = {_, Code, Text},
+ close = none}) ->
+ {Reason, Code, Text}.
+
+internal_error_closing() ->
+ #closing{reason = internal_error,
+ reply = {internal_error, ?INTERNAL_ERROR, <<>>}}.
+
+%%---------------------------------------------------------------------------
+%% Channel utilities
+%%---------------------------------------------------------------------------
+
+unregister_channel(Pid, State = #state{channels = Channels}) ->
+ NewChannels = amqp_channel_util:unregister_channel_pid(Pid, Channels),
+ NewState = State#state{channels = NewChannels},
+ check_trigger_all_channels_closed_event(NewState).
+
+check_trigger_all_channels_closed_event(State = #state{closing = false}) ->
+ State;
+check_trigger_all_channels_closed_event(State = #state{channels = Channels}) ->
+ case amqp_channel_util:is_channel_dict_empty(Channels) of
+ true -> all_channels_closed_event(State);
+ false -> State
+ end.
+
+handle_channel_exit(Pid, Reason,
+ #state{channels = Channels, closing = Closing} = State) ->
+ case amqp_channel_util:handle_exit(Pid, Reason, Channels, Closing) of
+ stop -> {stop, Reason, State};
+ normal -> {noreply, unregister_channel(Pid, State)};
+ close -> {noreply, set_closing_state(abrupt, internal_error_closing(),
+ unregister_channel(Pid, State))};
+ other -> {noreply, set_closing_state(abrupt, internal_error_closing(),
+ State)}
+ end.
+
+%%---------------------------------------------------------------------------
+%% Connecting to the broker
+%%---------------------------------------------------------------------------
+
+do_connect(State0 = #state{params = #amqp_params{username = User,
+ password = Pass,
+ virtual_host = VHost}}) ->
+ case lists:keymember(rabbit, 1, application:which_applications()) of
+ true -> ok;
+ false -> exit(broker_not_found_in_vm)
+ end,
+ rabbit_access_control:user_pass_login(User, Pass),
+ rabbit_access_control:check_vhost_access(
+ #user{username = User, password = Pass}, VHost),
+ State1 = start_infrastructure(State0),
+ State1#state{server_properties = rabbit_reader:server_properties()}.
+
+start_infrastructure(State = #state{start_infrastructure_fun = SIF}) ->
+ {ok, Collector} = SIF(),
+ State#state{collector = Collector}.
147 src/amqp_main_reader.erl
@@ -0,0 +1,147 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is the RabbitMQ Erlang Client.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd.,
+%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd., Cohesive Financial
+%% Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
+%% 2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
+%% Technologies Ltd.;
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): Ben Hood <0x6e6562@gmail.com>.
+
+%% @private
+-module(amqp_main_reader).
+
+-include("amqp_client.hrl").
+
+-behaviour(gen_server).
+
+-export([start_link/3, register_framing_channel/3]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2]).
+
+-record(state, {sup,
+ sock,
+ connection,
+ message = none, %% none | {Type, Channel, Length}
+ framing_channels = amqp_channel_util:new_channel_dict()}).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+start_link(Sock, Framing0, Connection) ->
+ gen_server:start_link(?MODULE, [self(), Sock, Framing0, Connection], []).
+
+register_framing_channel(MainReaderPid, Number, FramingPid) ->
+ gen_server:call(MainReaderPid,
+ {register_framing_channel, Number, FramingPid}, infinity).
+
+%%---------------------------------------------------------------------------
+%% gen_server callbacks
+%%---------------------------------------------------------------------------
+
+init([Sup, Sock, Framing0, Connection]) ->
+ State0 = #state{sup = Sup, sock = Sock, connection = Connection},
+ State1 = internal_register_framing_channel(0, Framing0, State0),
+ {ok, _Ref} = rabbit_net:async_recv(Sock, 7, infinity),
+ {ok, State1}.
+
+terminate(Reason, #state{sock = Sock}) ->
+ Nice = case Reason of normal -> true;
+ shutdown -> true;
+ {shutdown, _} -> true;
+ _ -> false
+ end,
+ ok = case Nice of true -> rabbit_net:close(Sock);
+ false -> ok
+ end.
+
+code_change(_OldVsn, State, _Extra) ->
+ State.
+
+handle_call({register_framing_channel, Number, Pid}, _From, State) ->
+ {reply, ok, internal_register_framing_channel(Number, Pid, State)}.
+
+handle_cast(Cast, State) ->
+ {stop, {unexpected_cast, Cast}, State}.
+
+handle_info({inet_async, _, _, _} = InetAsync, State) ->
+ handle_inet_async(InetAsync, State);
+handle_info({'DOWN', _, _, _, _} = Down, State) ->
+ handle_down(Down, State);
+handle_info({channel_exit, Ch, Reason}, State) ->
+ {stop, {channel_died, Ch, Reason}, State}.
+
+%%---------------------------------------------------------------------------
+%% Internal plumbing
+%%---------------------------------------------------------------------------
+
+handle_inet_async({inet_async, Sock, _, Msg},
+ State = #state{sock = Sock, message = CurMessage}) ->
+ {Type, Channel, Length} = case CurMessage of {T, C, L} -> {T, C, L};
+ none -> {none, none, none}
+ end,
+ case Msg of
+ {ok, <<Payload:Length/binary, ?FRAME_END>>} ->
+ handle_frame(Type, Channel, Payload, State),
+ {ok, _Ref} = rabbit_net:async_recv(Sock, 7, infinity),
+ {noreply, State#state{message = none}};
+ {ok, <<NewType:8, NewChannel:16, NewLength:32>>} ->
+ {ok, _Ref} = rabbit_net:async_recv(Sock, NewLength + 1, infinity),
+ {noreply, State#state{message={NewType, NewChannel, NewLength}}};
+ {error, closed} ->
+ State#state.connection ! socket_closed,
+ {noreply, State};
+ {error, Reason} ->
+ {stop, {socket_error, Reason}, State}
+ end.
+
+handle_frame(Type, Channel, Payload, State) ->
+ case rabbit_reader:analyze_frame(Type, Payload, ?PROTOCOL) of
+ heartbeat when Channel /= 0 ->
+ rabbit_misc:die(frame_error);
+ %% Match heartbeats but don't do anything with them
+ heartbeat ->
+ heartbeat;
+ AnalyzedFrame ->
+ pass_frame(Channel, AnalyzedFrame, State)
+ end.
+
+pass_frame(Channel, Frame, #state{framing_channels = Channels}) ->
+ case amqp_channel_util:resolve_channel_number(Channel, Channels) of
+ undefined ->
+ ?LOG_INFO("Dropping frame ~p for invalid or closed channel "
+ "number ~p~n", [Frame, Channel]),
+ ok;
+ FramingPid ->
+ rabbit_framing_channel:process(FramingPid, Frame)
+ end.
+
+handle_down({'DOWN', _MonitorRef, process, Pid, Info},
+ State = #state{framing_channels = Channels}) ->
+ case amqp_channel_util:is_channel_pid_registered(Pid, Channels) of
+ true -> NewChannels =
+ amqp_channel_util:unregister_channel_pid(Pid, Channels),
+ {noreply, State#state{framing_channels = NewChannels}};
+ false -> {stop, {unexpected_down, Pid, Info}, State}
+ end.
+
+internal_register_framing_channel(
+ Number, Pid, State = #state{framing_channels = Channels}) ->
+ NewChannels = amqp_channel_util:register_channel(Number, Pid, Channels),
+ erlang:monitor(process, Pid),
+ State#state{framing_channels = NewChannels}.
465 src/amqp_network_connection.erl
@@ -0,0 +1,465 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is the RabbitMQ Erlang Client.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd.,
+%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd., Cohesive Financial
+%% Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C)
+%% 2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit
+%% Technologies Ltd.;
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): Ben Hood <0x6e6562@gmail.com>.
+
+%% @private
+-module(amqp_network_connection).
+
+-include("amqp_client.hrl").
+
+-behaviour(gen_server).
+
+-export([start_link/3, connect/1]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2]).
+
+-define(RABBIT_TCP_OPTS, [binary, {packet, 0}, {active,false}, {nodelay, true}]).
+-define(SOCKET_CLOSING_TIMEOUT, 1000).
+-define(CLIENT_CLOSE_TIMEOUT, 60000).
+-define(HANDSHAKE_RECEIVE_TIMEOUT, 60000).
+
+-record(state, {sup,
+ params,
+ sock,
+ max_channel,
+ heartbeat,
+ framing0,
+ writer0,
+ main_reader,
+ channel_sup_sup,
+ closing = false,
+ server_properties,
+ channels = amqp_channel_util:new_channel_dict(),
+ start_heartbeat_fun,
+ start_infrastructure_fun}).
+
+-record(closing, {reason,
+ close,
+ from = none,
+ phase = terminate_channels}).
+
+-define(INFO_KEYS, (amqp_connection:info_keys() ++
+ [max_channel, heartbeat, sock])).
+
+%%---------------------------------------------------------------------------
+%% Internal interface
+%%---------------------------------------------------------------------------
+
+start_link(AmqpParams, ChSupSup, SIF) ->
+ gen_server:start_link(?MODULE, [self(), AmqpParams, ChSupSup, SIF], []).
+
+connect(Pid) ->
+ gen_server:call(Pid, connect, infinity).
+
+%%---------------------------------------------------------------------------
+%% gen_server callbacks
+%%---------------------------------------------------------------------------
+
+init([Sup, AmqpParams, ChSupSup, SIF]) ->
+ {ok, #state{sup = Sup,
+ params = AmqpParams,
+ channel_sup_sup = ChSupSup,
+ start_infrastructure_fun = SIF}}.
+
+handle_call({command, Command}, From, #state{closing = false} = State) ->
+ handle_command(Command, From, State);
+handle_call({command, _Command}, _From, State) ->
+ {reply, closing, State};
+handle_call({info, Items}, _From, State) ->
+ {reply, [{Item, i(Item, State)} || Item <- Items], State};
+handle_call(info_keys, _From, State) ->
+ {reply, ?INFO_KEYS, State};
+handle_call(connect, _From, State) ->
+ {reply, ok, do_connect(State)}.
+
+%% Standard handling of a method sent by the broker (this is received from
+%% framing0)
+handle_cast({method, Method, Content}, State) ->
+ handle_method(Method, Content, State).
+
+%% This is received after we have sent 'connection.close' to the server
+%% but timed out waiting for 'connection.close_ok' back
+handle_info(timeout_waiting_for_close_ok, State = #state{closing = Closing}) ->
+ #closing{phase = wait_close_ok} = Closing, % assertion
+ {stop, {timeout_waiting_for_close_ok, closing_to_reason(Closing)}, State};
+%% This is received when the main reader has not sent socket_closed in time
+handle_info(socket_closing_timeout, State = #state{closing = Closing}) ->
+ #closing{phase = wait_socket_close} = Closing, % assertion
+ {stop, {socket_closing_timeout, closing_to_reason(Closing)}, State};
+%% Received from main reader
+handle_info(socket_closed, State) ->
+ handle_socket_closed(State);
+%% DOWN signals from channels
+handle_info({'DOWN', _, process, Pid, Reason}, State) ->
+ handle_channel_exit(Pid, Reason, State);
+%% timeout from heartbeat receiver
+handle_info(timeout, State) ->
+ {stop, heartbeat_timeout, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ State.
+
+%%---------------------------------------------------------------------------
+%% Command handling
+%%---------------------------------------------------------------------------
+
+handle_command({open_channel, ProposedNumber}, _From,
+ State = #state{sock = Sock,
+ channels = Channels,
+ max_channel = MaxChannel,
+ main_reader = MainReader,
+ channel_sup_sup = ChSupSup}) ->
+ try amqp_channel_util:open_channel(ChSupSup, ProposedNumber, MaxChannel,
+ [Sock, MainReader], Channels) of
+ {ChannelPid, NewChannels} ->
+ {reply, {ok, ChannelPid}, State#state{channels = NewChannels}}
+ catch
+ error:out_of_channel_numbers = Error ->
+ {reply, {error, {Error, MaxChannel}}, State}
+ end;
+
+handle_command({close, #'connection.close'{} = Close}, From, State) ->
+ {noreply, set_closing_state(flush, #closing{reason = app_initiated_close,
+ close = Close,
+ from = From},
+ State)}.
+
+%%---------------------------------------------------------------------------
+%% Handling methods from broker
+%%---------------------------------------------------------------------------
+
+handle_method(#'connection.close'{} = Close, none, State) ->
+ {noreply, set_closing_state(abrupt,
+ #closing{reason = server_initiated_close,
+ close = Close},
+ State)};
+
+handle_method(#'connection.close_ok'{}, none,
+ State = #state{closing = Closing}) ->
+ #closing{from = From,
+ close = #'connection.close'{reply_code = ReplyCode}} = Closing,
+ case From of
+ none -> ok;
+ _ -> gen_server:reply(From, ok)
+ end,
+ if ReplyCode =:= 200 -> {stop, normal, State};
+ true -> {stop, closing_to_reason(Closing), State}
+ end.
+
+%%---------------------------------------------------------------------------
+%% Infos
+%%---------------------------------------------------------------------------
+
+i(server_properties, State) -> State#state.server_properties;
+i(is_closing, State) -> State#state.closing =/= false;
+i(amqp_params, State) -> State#state.params;
+i(max_channel, State) -> State#state.max_channel;
+i(heartbeat, State) -> State#state.heartbeat;
+i(sock, State) -> State#state.sock;
+i(num_channels, State) -> amqp_channel_util:num_channels(
+ State#state.channels);
+i(Item, _State) -> throw({bad_argument, Item}).
+
+%%---------------------------------------------------------------------------
+%% Closing
+%%---------------------------------------------------------------------------
+
+%% Changes connection's state to closing.
+%%
+%% ChannelCloseType can be flush or abrupt
+%%
+%% The closing reason (Closing#closing.reason) can be one of the following
+%% app_initiated_close - app has invoked the close/{1,3} command. In this
+%% case the close field is the method to be sent to the server after all
+%% the channels have terminated (and flushed); the from field is the
+%% process that initiated the call and to whom the server must reply.
+%% phase = terminate_channels | wait_close_ok
+%% internal_error - there was an internal error either in a channel or in
+%% the connection process. close field is the method to be sent to the
+%% server after all channels have been abruptly terminated (do not flush
+%% in this case).
+%% phase = terminate_channels | wait_close_ok
+%% server_initiated_close - server has sent 'connection.close'. close field
+%% is the method sent by the server.
+%% phase = terminate_channels | wait_socket_close
+%%
+%% The precedence of the closing MainReason's is as follows:
+%% app_initiated_close, internal_error, server_initiated_close
+%% (i.e.: a given reason can override the currently set one if it is later
+%% mentioned in the above list). We can rely on erlang's comparison of atoms
+%% for this.
+set_closing_state(ChannelCloseType, Closing,
+ #state{closing = false,
+ channels = Channels} = State) ->
+ amqp_channel_util:broadcast_to_channels(
+ {connection_closing, ChannelCloseType, closing_to_reason(Closing)},
+ Channels),
+ check_trigger_all_channels_closed_event(State#state{closing = Closing});
+%% Already closing, override situation
+set_closing_state(ChannelCloseType, NewClosing,
+ #state{closing = CurClosing,
+ channels = Channels} = State) ->
+ %% Do not override reason in channels (because it might cause channels
+ %% to exit with different reasons) but do cause them to close abruptly
+ %% if the new closing type requires it
+ case ChannelCloseType of
+ abrupt ->
+ amqp_channel_util:broadcast_to_channels(
+ {connection_closing, ChannelCloseType,
+ closing_to_reason(CurClosing)},
+ Channels);
+ _ -> ok
+ end,
+ #closing{reason = NewReason, close = NewClose} = NewClosing,
+ #closing{reason = CurReason} = CurClosing,
+ ResClosing =
+ if
+ %% Override (rely on erlang's comparison of atoms)
+ NewReason >= CurReason ->
+ %% Note that when overriding, we keep the current phase
+ CurClosing#closing{reason = NewReason, close = NewClose};
+ %% Do not override
+ true ->
+ CurClosing
+ end,
+ NewState = State#state{closing = ResClosing},
+ %% Now check if it's the case that the server has sent a connection.close
+ %% while we were in the closing state (for whatever reason). We need to
+ %% send connection.close_ok (it might be even be the case that we are
+ %% sending it again) and wait for the socket to close.
+ case NewReason of
+ server_initiated_close -> all_channels_closed_event(NewState);
+ _ -> NewState
+ end.
+
+%% The all_channels_closed_event is called when all channels have been closed
+%% after the connection broadcasts a connection_closing message to all channels
+all_channels_closed_event(#state{writer0 = Writer,
+ closing = Closing} = State) ->
+ #closing{reason = Reason, close = Close} = Closing,
+ case Reason of
+ server_initiated_close ->
+ amqp_channel_util:do(network, Writer, #'connection.close_ok'{},
+ none),
+ erlang:send_after(?SOCKET_CLOSING_TIMEOUT, self(),
+ socket_closing_timeout),
+ State#state{closing = Closing#closing{phase = wait_socket_close}};
+ _ ->
+ amqp_channel_util:do(network, Writer, Close, none),
+ erlang:send_after(?CLIENT_CLOSE_TIMEOUT, self(),
+ timeout_waiting_for_close_ok),
+ State#state{closing = Closing#closing{phase = wait_close_ok}}
+ end.
+
+closing_to_reason(#closing{reason = Reason,
+ close = #'connection.close'{reply_code = Code,
+ reply_text = Text}}) ->
+ {Reason, Code, Text}.
+
+internal_error_closing() ->
+ #closing{reason = internal_error,
+ close = #'connection.close'{reply_text = <<>>,
+ reply_code = ?INTERNAL_ERROR,
+ class_id = 0,
+ method_id = 0}}.
+
+handle_socket_closed(State = #state{
+ closing = Closing = #closing{
+ phase = wait_socket_close}}) ->
+ {stop, closing_to_reason(Closing), State};
+handle_socket_closed(State) ->
+ {stop, socket_closed_unexpectedly, State}.
+
+%%---------------------------------------------------------------------------
+%% Channel utilities
+%%---------------------------------------------------------------------------
+
+unregister_channel(Pid, State = #state{channels = Channels}) ->
+ NewChannels = amqp_channel_util:unregister_channel_pid(Pid, Channels),
+ NewState = State#state{channels = NewChannels},
+ check_trigger_all_channels_closed_event(NewState).
+
+check_trigger_all_channels_closed_event(#state{closing = false} = State) ->
+ State;
+check_trigger_all_channels_closed_event(#state{channels = Channels,
+ closing = Closing} = State) ->
+ #closing{phase = terminate_channels} = Closing, %% assertion
+ case amqp_channel_util:is_channel_dict_empty(Channels) of
+ true -> all_channels_closed_event(State);
+ false -> State
+ end.
+
+handle_channel_exit(Pid, Reason,
+ #state{channels = Channels, closing = Closing} = State) ->
+ case amqp_channel_util:handle_exit(Pid, Reason, Channels, Closing) of
+ stop -> {stop, Reason, State};
+ normal -> {noreply, unregister_channel(Pid, State)};
+ close -> {noreply, set_closing_state(abrupt, internal_error_closing(),
+ unregister_channel(Pid, State))};
+ other -> {noreply, set_closing_state(abrupt, internal_error_closing(),
+ State)}
+ end.
+
+%%---------------------------------------------------------------------------
+%% Handshake
+%%---------------------------------------------------------------------------
+
+do_connect(State = #state{params = #amqp_params{host = Host,
+ port = Port,
+ ssl_options = none}}) ->
+ case gen_tcp:connect(Host, Port, ?RABBIT_TCP_OPTS) of
+ {ok, Sock} -> handshake(State#state{sock = Sock});
+ {error, Reason} -> ?LOG_WARN("Could not start the network driver: ~p~n",
+ [Reason]),
+ exit(Reason)
+ end;
+do_connect(State = #state{params = #amqp_params{host = Host,
+ port = Port,
+ ssl_options = SslOpts}}) ->
+ rabbit_misc:start_applications([crypto, ssl]),
+ case gen_tcp:connect(Host, Port, ?RABBIT_TCP_OPTS) of
+ {ok, Sock} ->
+ case ssl:connect(Sock, SslOpts) of
+ {ok, SslSock} ->
+ RabbitSslSock = #ssl_socket{ssl = SslSock, tcp = Sock},
+ handshake(State#state{sock = RabbitSslSock});
+ {error, Reason} ->
+ ?LOG_WARN("Could not upgrade the network driver to ssl: "
+ "~p~n", [Reason]),
+ exit(Reason)
+ end;
+ {error, Reason} ->
+ ?LOG_WARN("Could not start the network driver: ~p~n", [Reason]),
+ exit(Reason)
+ end.
+
+handshake(State0 = #state{sock = Sock}) ->
+ ok = rabbit_net:send(Sock, ?PROTOCOL_HEADER),
+ State1 = start_infrastructure(State0),