Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Updated to RabbitMQ 2.8.1

  • Loading branch information...
commit ce9d2b8257adcf70c62e1bf573cd86d419a76c21 1 parent ebe608c
@jbrisbin authored
View
9 README.md
@@ -5,21 +5,18 @@ This is a fork of the [official RabbitMQ/AMQP Erlang client](https://github.com/
It's meant to be included in your rebar projects in your rebar.config file:
{deps, [
- {amqp_client, ".*", {git, "git://github.com/jbrisbin/amqp_client.git", {tag, "rabbitmq_2.7.1"}}}
+ {amqp_client, ".*", {git, "git://github.com/jbrisbin/amqp_client.git", {tag, "rabbitmq-2.8.1"}}}
]}.
The "master" branch of this port is a simple re-packaging of the rabbit_common AMQP client dependency.
-The "community" branch, however, is a port of the RabbitMQ source code with additional strict compilation
-checking turned on and the source code edited to eliminate warnings. It should be 100% compatible with the
-unaltered source code. The community branch is simply a tweak to allow projects that depend on rabbit_common
-and also have strict compilation options turned on with this project introducing warnings into those projects.
+The "community" branch, however, is a port of the RabbitMQ source code with additional strict compilation checking turned on and the source code edited to eliminate warnings. It should be 100% compatible with the unaltered source code. The community branch is simply a tweak to allow projects that depend on rabbit_common to not have to deal with the warnings issued by the compiler in the unaltered RabbitMQ code.
To use the "community" branch in your project, which includes stricter compilation settings, add "-community"
to the version tag:
{deps, [
- {amqp_client, ".*", {git, "git://github.com/jbrisbin/amqp_client.git", {tag, "rabbitmq_2.7.1-community"}}}
+ {amqp_client, ".*", {git, "git://github.com/jbrisbin/amqp_client.git", {tag, "rabbitmq-2.8.1-community"}}}
]}.
### License
View
2  include/amqp_client.hrl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-ifndef(AMQP_CLIENT_HRL).
View
4 include/amqp_gen_consumer_spec.hrl
@@ -11,11 +11,12 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
-include("amqp_client.hrl").
+-ifndef(edoc).
-type(state() :: any()).
-type(consume() :: #'basic.consume'{}).
-type(consume_ok() :: #'basic.consume_ok'{}).
@@ -38,3 +39,4 @@
{reply, any(), state()} | {noreply, state()} |
{error, reason(), state()}).
-spec(terminate/2 :: (any(), state()) -> state()).
+-endif.
View
2  rebar.config
@@ -1,5 +1,5 @@
{deps, [
- {rabbit_common, ".*", {git, "git://github.com/jbrisbin/rabbit_common.git", {tag, "rabbitmq_2.7.1"}}}
+ {rabbit_common, ".*", {git, "git://github.com/jbrisbin/rabbit_common.git", {tag, "rabbitmq-2.8.1"}}}
]}.
{erl_opts, [
View
2  src/amqp_auth_mechanisms.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% @private
View
180 src/amqp_channel.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% @type close_reason(Type) = {shutdown, amqp_reason(Type)}.
@@ -66,13 +66,13 @@
-behaviour(gen_server).
--export([call/2, call/3, cast/2, cast/3]).
+-export([call/2, call/3, cast/2, cast/3, cast_flow/3]).
-export([close/1, close/3]).
-export([register_return_handler/2, register_flow_handler/2,
register_confirm_handler/2]).
-export([call_consumer/2, subscribe/3]).
--export([next_publish_seqno/1, wait_for_confirms/1,
- wait_for_confirms_or_die/1]).
+-export([next_publish_seqno/1, wait_for_confirms/1, wait_for_confirms/2,
+ wait_for_confirms_or_die/1, wait_for_confirms_or_die/2]).
-export([start_link/5, connection_closing/3, open/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
@@ -158,7 +158,7 @@ call(Channel, Method, Content) ->
%% @spec (Channel, Method) -> ok
%% @doc This is equivalent to amqp_channel:cast(Channel, Method, none).
cast(Channel, Method) ->
- gen_server:cast(Channel, {cast, Method, none, self()}).
+ gen_server:cast(Channel, {cast, Method, none, self(), noflow}).
%% @spec (Channel, Method, Content) -> ok
%% where
@@ -170,7 +170,17 @@ cast(Channel, Method) ->
%% This function is not recommended with synchronous methods, since there is no
%% way to verify that the server has received the method.
cast(Channel, Method, Content) ->
- gen_server:cast(Channel, {cast, Method, Content, self()}).
+ gen_server:cast(Channel, {cast, Method, Content, self(), noflow}).
+
+%% @spec (Channel, Method, Content) -> ok
+%% where
+%% Channel = pid()
+%% Method = amqp_method()
+%% Content = amqp_msg() | none
+%% @doc Like cast/3, with flow control.
+cast_flow(Channel, Method, Content) ->
+ credit_flow:send(Channel),
+ gen_server:cast(Channel, {cast, Method, Content, self(), flow}).
%% @spec (Channel) -> ok | closing
%% where
@@ -198,14 +208,25 @@ close(Channel, Code, Text) ->
next_publish_seqno(Channel) ->
gen_server:call(Channel, next_publish_seqno, infinity).
-%% @spec (Channel) -> boolean()
+%% @spec (Channel) -> boolean() | 'timeout'
%% where
%% Channel = pid()
%% @doc Wait until all messages published since the last call have
%% been either ack'd or nack'd by the broker. Note, when called on a
%% non-Confirm channel, waitForConfirms returns true immediately.
wait_for_confirms(Channel) ->
- gen_server:call(Channel, wait_for_confirms, infinity).
+ wait_for_confirms(Channel, infinity).
+
+%% @spec (Channel, Timeout) -> boolean() | 'timeout'
+%% where
+%% Channel = pid()
+%% Timeout = non_neg_integer() | 'infinity'
+%% @doc Wait until all messages published since the last call have
+%% been either ack'd or nack'd by the broker or the timeout expires.
+%% Note, when called on a non-Confirm channel, waitForConfirms returns
+%% true immediately.
+wait_for_confirms(Channel, Timeout) ->
+ gen_server:call(Channel, {wait_for_confirms, Timeout}, infinity).
%% @spec (Channel) -> true
%% where
@@ -214,7 +235,24 @@ wait_for_confirms(Channel) ->
%% received, the calling process is immediately sent an
%% exit(nack_received).
wait_for_confirms_or_die(Channel) ->
- gen_server:call(Channel, {wait_for_confirms_or_die, self()}, infinity).
+ wait_for_confirms_or_die(Channel, infinity).
+
+%% @spec (Channel, Timeout) -> true
+%% where
+%% Channel = pid()
+%% Timeout = non_neg_integer() | 'infinity'
+%% @doc Behaves the same as wait_for_confirms/1, but if a nack is
+%% received, the calling process is immediately sent an
+%% exit(nack_received). If the timeout expires, the calling process is
+%% sent an exit(timeout).
+wait_for_confirms_or_die(Channel, Timeout) ->
+ case wait_for_confirms(Channel, Timeout) of
+ timeout -> close(Channel, 200, <<"Confirm Timeout">>),
+ exit(timeout);
+ false -> close(Channel, 200, <<"Nacks Received">>),
+ exit(nacks_received);
+ true -> true
+ end.
%% @spec (Channel, ReturnHandler) -> ok
%% where
@@ -296,13 +334,13 @@ init([Driver, Connection, ChannelNumber, Consumer, SWF]) ->
%% @private
handle_call(open, From, State) ->
- {noreply, rpc_top_half(#'channel.open'{}, none, From, none, State)};
+ {noreply, rpc_top_half(#'channel.open'{}, none, From, none, noflow, State)};
%% @private
handle_call({close, Code, Text}, From, State) ->
handle_close(Code, Text, From, State);
%% @private
handle_call({call, Method, AmqpMsg, Sender}, From, State) ->
- handle_method_to_server(Method, AmqpMsg, From, Sender, State);
+ handle_method_to_server(Method, AmqpMsg, From, Sender, noflow, State);
%% Handles the delivery of messages from a direct channel
%% @private
handle_call({send_command_sync, Method, Content}, From, State) ->
@@ -319,25 +357,23 @@ handle_call({send_command_sync, Method}, From, State) ->
handle_call(next_publish_seqno, _From,
State = #state{next_pub_seqno = SeqNo}) ->
{reply, SeqNo, State};
-
-handle_call(wait_for_confirms, From, State) ->
- handle_wait_for_confirms(From, none, true, State);
-
-%% Lets the channel know that the process should be sent an exit
-%% signal if a nack is received.
-handle_call({wait_for_confirms_or_die, Pid}, From, State) ->
- handle_wait_for_confirms(From, Pid, ok, State);
+handle_call({wait_for_confirms, Timeout}, From, State) ->
+ handle_wait_for_confirms(From, Timeout, State);
%% @private
handle_call({call_consumer, Msg}, _From,
State = #state{consumer = Consumer}) ->
{reply, amqp_gen_consumer:call_consumer(Consumer, Msg), State};
%% @private
handle_call({subscribe, BasicConsume, Subscriber}, From, State) ->
- handle_method_to_server(BasicConsume, none, From, Subscriber, State).
+ handle_method_to_server(BasicConsume, none, From, Subscriber, noflow,
+ State).
%% @private
-handle_cast({cast, Method, AmqpMsg, Sender}, State) ->
- handle_method_to_server(Method, AmqpMsg, none, Sender, State);
+handle_cast({cast, Method, AmqpMsg, Sender, noflow}, State) ->
+ handle_method_to_server(Method, AmqpMsg, none, Sender, noflow, State);
+handle_cast({cast, Method, AmqpMsg, Sender, flow}, State) ->
+ credit_flow:ack(Sender),
+ handle_method_to_server(Method, AmqpMsg, none, Sender, flow, State);
%% @private
handle_cast({register_return_handler, ReturnHandler}, State) ->
erlang:monitor(process, ReturnHandler),
@@ -352,7 +388,7 @@ handle_cast({register_flow_handler, FlowHandler}, State) ->
{noreply, State#state{flow_handler_pid = FlowHandler}};
%% Received from channels manager
%% @private
-handle_cast({method, Method, Content}, State) ->
+handle_cast({method, Method, Content, noflow}, State) ->
handle_method_from_server(Method, Content, State);
%% Handles the situation when the connection closes without closing the channel
%% beforehand. The channel must block all further RPCs,
@@ -387,6 +423,10 @@ handle_info({channel_closing, ChPid}, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
{noreply, State};
%% @private
+handle_info({bump_credit, Msg}, State) ->
+ credit_flow:handle_bump_msg(Msg),
+ {noreply, State};
+%% @private
handle_info(timed_out_flushing_channel, State) ->
?LOG_WARN("Channel (~p) closing: timed out flushing while "
"connection closing~n", [self()]),
@@ -408,7 +448,18 @@ handle_info({'DOWN', _, process, FlowHandler, Reason},
State = #state{flow_handler_pid = FlowHandler}) ->
?LOG_WARN("Channel (~p): Unregistering flow handler ~p because it died. "
"Reason: ~p~n", [self(), FlowHandler, Reason]),
- {noreply, State#state{flow_handler_pid = none}}.
+ {noreply, State#state{flow_handler_pid = none}};
+handle_info({'DOWN', _, process, QPid, _Reason}, State) ->
+ rabbit_amqqueue:notify_sent_queue_down(QPid),
+ {noreply, State};
+handle_info({confirm_timeout, From}, State = #state{waiting_set = WSet}) ->
+ case gb_trees:lookup(From, WSet) of
+ none ->
+ {noreply, State};
+ {value, _} ->
+ gen_server:reply(From, timeout),
+ {noreply, State#state{waiting_set = gb_trees:delete(From, WSet)}}
+ end.
%% @private
terminate(_Reason, State) ->
@@ -422,7 +473,7 @@ code_change(_OldVsn, State, _Extra) ->
%% RPC mechanism
%%---------------------------------------------------------------------------
-handle_method_to_server(Method, AmqpMsg, From, Sender,
+handle_method_to_server(Method, AmqpMsg, From, Sender, Flow,
State = #state{unconfirmed_set = USet}) ->
case {check_invalid_method(Method), From,
check_block(Method, AmqpMsg, State)} of
@@ -440,7 +491,7 @@ handle_method_to_server(Method, AmqpMsg, From, Sender,
State
end,
{noreply, rpc_top_half(Method, build_content(AmqpMsg),
- From, Sender, State1)};
+ From, Sender, Flow, State1)};
{ok, none, BlockReply} ->
?LOG_WARN("Channel (~p): discarding method ~p in cast.~n"
"Reason: ~p~n", [self(), Method, BlockReply]),
@@ -461,22 +512,23 @@ handle_close(Code, Text, From, State) ->
class_id = 0,
method_id = 0},
case check_block(Close, none, State) of
- ok -> {noreply, rpc_top_half(Close, none, From, none, State)};
+ ok -> {noreply, rpc_top_half(Close, none, From, none, noflow,
+ State)};
BlockReply -> {reply, BlockReply, State}
end.
-rpc_top_half(Method, Content, From, Sender,
+rpc_top_half(Method, Content, From, Sender, Flow,
State0 = #state{rpc_requests = RequestQueue}) ->
State1 = State0#state{
- rpc_requests =
- queue:in({From, Sender, Method, Content}, RequestQueue)},
+ rpc_requests = queue:in({From, Sender, Method, Content, Flow},
+ RequestQueue)},
IsFirstElement = queue:is_empty(RequestQueue),
if IsFirstElement -> do_rpc(State1);
true -> State1
end.
rpc_bottom_half(Reply, State = #state{rpc_requests = RequestQueue}) ->
- {{value, {From, _Sender, _Method, _Content}}, RequestQueue1} =
+ {{value, {From, _Sender, _Method, _Content, _Flow}}, RequestQueue1} =
queue:out(RequestQueue),
case From of
none -> ok;
@@ -487,9 +539,9 @@ rpc_bottom_half(Reply, State = #state{rpc_requests = RequestQueue}) ->
do_rpc(State = #state{rpc_requests = Q,
closing = Closing}) ->
case queue:out(Q) of
- {{value, {From, Sender, Method, Content}}, NewQ} ->
+ {{value, {From, Sender, Method, Content, Flow}}, NewQ} ->
State1 = pre_do(Method, Content, Sender, State),
- DoRet = do(Method, Content, State1),
+ DoRet = do(Method, Content, Flow, State1),
case ?PROTOCOL:is_method_synchronous(Method) of
true -> State1;
false -> case {From, DoRet} of
@@ -513,7 +565,7 @@ do_rpc(State = #state{rpc_requests = Q,
end.
pending_rpc_method(#state{rpc_requests = Q}) ->
- {value, {_From, _Sender, Method, _Content}} = queue:peek(Q),
+ {value, {_From, _Sender, Method, _Content, _Flow}} = queue:peek(Q),
Method.
pre_do(#'channel.open'{}, none, _Sender, State) ->
@@ -562,13 +614,13 @@ handle_method_from_server1(#'channel.close'{reply_code = Code,
State = #state{closing = {just_channel, _}}) ->
%% Both client and server sent close at the same time. Don't shutdown yet,
%% wait for close_ok.
- do(#'channel.close_ok'{}, none, State),
+ do(#'channel.close_ok'{}, none, noflow, State),
{noreply,
State#state{
closing = {just_channel, {server_initiated_close, Code, Text}}}};
handle_method_from_server1(#'channel.close'{reply_code = Code,
reply_text = Text}, none, State) ->
- do(#'channel.close_ok'{}, none, State),
+ do(#'channel.close_ok'{}, none, noflow, State),
handle_shutdown({server_initiated_close, Code, Text}, State);
handle_method_from_server1(#'channel.close_ok'{}, none,
State = #state{closing = Closing}) ->
@@ -604,7 +656,7 @@ handle_method_from_server1(#'channel.flow'{active = Active} = Flow, none,
%% flushed beforehand. Methods that made it to the queue are not
%% blocked in any circumstance.
{noreply, rpc_top_half(#'channel.flow_ok'{active = Active}, none, none,
- none, State#state{flow_active = Active})};
+ none, noflow, State#state{flow_active = Active})};
handle_method_from_server1(
#'basic.return'{} = BasicReturn, AmqpMsg,
State = #state{return_handler_pid = ReturnHandler}) ->
@@ -629,12 +681,12 @@ handle_method_from_server1(#'basic.nack'{} = BasicNack, none,
#state{confirm_handler_pid = none} = State) ->
?LOG_WARN("Channel (~p): received ~p but there is no "
"confirm handler registered~n", [self(), BasicNack]),
- {noreply, update_confirm_set(BasicNack, handle_nack(State))};
+ {noreply, update_confirm_set(BasicNack, State)};
handle_method_from_server1(
#'basic.nack'{} = BasicNack, none,
#state{confirm_handler_pid = ConfirmHandler} = State) ->
ConfirmHandler ! BasicNack,
- {noreply, update_confirm_set(BasicNack, handle_nack(State))};
+ {noreply, update_confirm_set(BasicNack, State)};
handle_method_from_server1(Method, none, State) ->
{noreply, rpc_bottom_half(Method, State)};
@@ -693,14 +745,15 @@ handle_shutdown(Reason, State) ->
%% Internal plumbing
%%---------------------------------------------------------------------------
-do(Method, Content, #state{driver = Driver, writer = W}) ->
+do(Method, Content, Flow, #state{driver = Driver, writer = W}) ->
%% Catching because it expects the {channel_exit, _, _} message on error
- catch case {Driver, Content} of
- {network, none} -> rabbit_writer:send_command_sync(W, Method);
- {network, _} -> rabbit_writer:send_command_sync(W, Method,
- Content);
- {direct, none} -> rabbit_channel:do(W, Method);
- {direct, _} -> rabbit_channel:do(W, Method, Content)
+ catch case {Driver, Content, Flow} of
+ {network, none, _} -> rabbit_writer:send_command_sync(W, Method);
+ {network, _, _} -> rabbit_writer:send_command_sync(W, Method,
+ Content);
+ {direct, none, _} -> rabbit_channel:do(W, Method);
+ {direct, _, flow} -> rabbit_channel:do_flow(W, Method, Content);
+ {direct, _, noflow} -> rabbit_channel:do(W, Method, Content)
end.
start_writer(State = #state{start_writer_fun = SWF}) ->
@@ -757,14 +810,6 @@ server_misbehaved(#amqp_error{} = AmqpError, State = #state{number = Number}) ->
{noreply, State}
end.
-handle_nack(State = #state{waiting_set = WSet}) ->
- DyingPids = [Pid || {_, Pid} <- gb_trees:to_list(WSet), Pid =/= none],
- case DyingPids of
- [] -> State;
- _ -> [exit(Pid, nack_received) || Pid <- DyingPids],
- close(self(), 200, <<"Nacks Received">>)
- end.
-
update_confirm_set(#'basic.ack'{delivery_tag = SeqNo,
multiple = Multiple},
State = #state{unconfirmed_set = USet}) ->
@@ -796,20 +841,33 @@ maybe_notify_waiters(State = #state{unconfirmed_set = USet}) ->
true -> notify_confirm_waiters(State)
end.
-notify_confirm_waiters(State = #state{waiting_set = WSet,
+notify_confirm_waiters(State = #state{waiting_set = WSet,
only_acks_received = OAR}) ->
- [gen_server:reply(From, OAR) || {From, _} <- gb_trees:to_list(WSet)],
- State#state{waiting_set = gb_trees:empty(),
+ [begin
+ safe_cancel_timer(TRef),
+ gen_server:reply(From, OAR)
+ end || {From, TRef} <- gb_trees:to_list(WSet)],
+ State#state{waiting_set = gb_trees:empty(),
only_acks_received = true}.
-handle_wait_for_confirms(From, Notify, EmptyReply,
+handle_wait_for_confirms(From, Timeout,
State = #state{unconfirmed_set = USet,
waiting_set = WSet}) ->
case gb_sets:is_empty(USet) of
- true -> {reply, EmptyReply, State};
- false -> {noreply, State#state{waiting_set =
- gb_trees:insert(From, Notify, WSet)}}
+ true ->
+ {reply, true, State};
+ false ->
+ TRef = case Timeout of
+ infinity -> undefined;
+ _ -> erlang:send_after(Timeout * 1000, self(),
+ {confirm_timeout, From})
+ end,
+ {noreply,
+ State#state{waiting_set = gb_trees:insert(From, TRef, WSet)}}
end.
call_to_consumer(Method, Args, #state{consumer = Consumer}) ->
amqp_gen_consumer:call_consumer(Consumer, Method, Args).
+
+safe_cancel_timer(undefined) -> ok;
+safe_cancel_timer(TRef) -> erlang:cancel_timer(TRef).
View
7 src/amqp_channel_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% @private
@@ -44,12 +44,13 @@ start_link(Type, Connection, InfraArgs, ChNumber, Consumer = {_, _}) ->
%% Internal plumbing
%%---------------------------------------------------------------------------
-start_writer_fun(_Sup, direct, [ConnectionPid, Node, User, VHost, Collector],
+start_writer_fun(_Sup, direct, [ConnPid, ConnName, Node, User, VHost,
+ Collector],
ChNumber) ->
fun () ->
{ok, RabbitCh} =
rpc:call(Node, rabbit_direct, start_channel,
- [ChNumber, self(), ConnectionPid, ?PROTOCOL, User,
+ [ChNumber, self(), ConnPid, ConnName, ?PROTOCOL, User,
VHost, ?CLIENT_CAPABILITIES, Collector]),
link(RabbitCh),
{ok, RabbitCh}
View
2  src/amqp_channel_sup_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% @private
View
24 src/amqp_channels_manager.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% @private
@@ -22,7 +22,8 @@
-behaviour(gen_server).
-export([start_link/2, open_channel/4, set_channel_max/2, is_empty/1,
- num_channels/1, pass_frame/3, signal_connection_closing/3]).
+ num_channels/1, pass_frame/3, signal_connection_closing/3,
+ process_channel_frame/4]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
@@ -59,6 +60,19 @@ pass_frame(ChMgr, ChNumber, Frame) ->
signal_connection_closing(ChMgr, ChannelCloseType, Reason) ->
gen_server:cast(ChMgr, {connection_closing, ChannelCloseType, Reason}).
+process_channel_frame(Frame, Channel, ChPid, AState) ->
+ case rabbit_command_assembler:process(Frame, AState) of
+ {ok, NewAState} -> NewAState;
+ {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
+ NewAState;
+ {ok, Method, Content, NewAState} -> rabbit_channel:do(ChPid, Method,
+ Content),
+ NewAState;
+ {error, Reason} -> ChPid ! {channel_exit, Channel,
+ Reason},
+ AState
+ end.
+
%%---------------------------------------------------------------------------
%% gen_server callbacks
%%---------------------------------------------------------------------------
@@ -195,10 +209,10 @@ internal_pass_frame(Number, Frame, State) ->
case internal_lookup_npa(Number, State) of
undefined ->
?LOG_INFO("Dropping frame ~p for invalid or closed "
- "channel number ~p~n", [Frame, Number]);
+ "channel number ~p~n", [Frame, Number]),
+ State;
{ChPid, AState} ->
- NewAState = rabbit_reader:process_channel_frame(
- Frame, ChPid, Number, ChPid, AState),
+ NewAState = process_channel_frame(Frame, Number, ChPid, AState),
internal_update_npa(Number, ChPid, NewAState, State)
end.
View
2  src/amqp_client.app.src
@@ -1,6 +1,6 @@
{application, amqp_client,
[{description, "RabbitMQ Erlang Client Library"},
- {vsn, "2.7.1"},
+ {vsn, "2.8.1"},
{modules, []},
{registered, []},
{env, []},
View
2  src/amqp_client.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% @private
View
2  src/amqp_connection.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% @type close_reason(Type) = {shutdown, amqp_reason(Type)}.
View
2  src/amqp_connection_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% @private
View
2  src/amqp_connection_type_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% @private
View
20 src/amqp_direct_connection.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% @private
@@ -21,6 +21,8 @@
-behaviour(amqp_gen_connection).
+-export([server_close/3]).
+
-export([init/1, terminate/2, connect/4, do/2, open_channel_args/1, i/2,
info_keys/0, handle_message/2, closing/3, channels_terminated/1]).
@@ -41,14 +43,24 @@
%%---------------------------------------------------------------------------
+%% amqp_connection:close() logically closes from the client end. We may
+%% want to close from the server end.
+server_close(ConnectionPid, Code, Text) ->
+ Close = #'connection.close'{reply_text = Text,
+ reply_code = Code,
+ class_id = 0,
+ method_id = 0},
+ amqp_gen_connection:server_close(ConnectionPid, Close).
+
init([]) ->
{ok, #state{}}.
open_channel_args(#state{node = Node,
user = User,
vhost = VHost,
+ adapter_info = Info,
collector = Collector}) ->
- [self(), Node, User, VHost, Collector].
+ [self(), Info#adapter_info.name, Node, User, VHost, Collector].
do(_Method, _State) ->
ok.
@@ -129,9 +141,7 @@ ensure_adapter_info(A = #adapter_info{protocol = unknown}) ->
ensure_adapter_info(A#adapter_info{protocol =
{'Direct', ?PROTOCOL:version()}});
-ensure_adapter_info(A = #adapter_info{name = unknown,
- peer_address = unknown,
- peer_port = unknown}) ->
+ensure_adapter_info(A = #adapter_info{name = unknown}) ->
Name = list_to_binary(rabbit_misc:pid_to_string(self())),
ensure_adapter_info(A#adapter_info{name = Name});
View
2  src/amqp_direct_consumer.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
%% @doc This module is an implementation of the amqp_gen_consumer
View
21 src/amqp_gen_connection.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% @private
@@ -23,7 +23,7 @@
-export([start_link/5, connect/1, open_channel/3, hard_error_in_channel/3,
channel_internal_error/3, server_misbehaved/2, channels_terminated/1,
- close/2, info/2, info_keys/0, info_keys/1]).
+ close/2, server_close/2, info/2, info_keys/0, info_keys/1]).
-export([behaviour_info/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
@@ -83,6 +83,9 @@ channels_terminated(Pid) ->
close(Pid, Close) ->
gen_server:call(Pid, {command, {close, Close}}, infinity).
+server_close(Pid, Close) ->
+ gen_server:cast(Pid, {server_close, Close}).
+
info(Pid, Items) ->
gen_server:call(Pid, {info, Items}, infinity).
@@ -176,10 +179,10 @@ handle_call(connect, _From,
{error, _} = Error ->
{stop, {shutdown, Error}, Error, State0}
end;
-handle_call({command, Command}, From, State = #state{closing = Closing}) ->
- case Closing of false -> handle_command(Command, From, State);
- _ -> {reply, closing, State}
- end;
+handle_call({command, Command}, From, State = #state{closing = false}) ->
+ handle_command(Command, From, State);
+handle_call({command, _Command}, _From, State) ->
+ {reply, closing, State};
handle_call({info, Items}, _From, State) ->
{reply, [{Item, i(Item, State)} || Item <- Items], State};
handle_call(info_keys, _From, State = #state{module = Mod}) ->
@@ -195,7 +198,7 @@ after_connect({ServerProperties, ChannelMax, NewMState},
channel_max = ChannelMax,
module_state = NewMState}.
-handle_cast({method, Method, none}, State) ->
+handle_cast({method, Method, none, noflow}, State) ->
handle_method(Method, State);
handle_cast(channels_terminated, State) ->
handle_channels_terminated(State);
@@ -206,7 +209,9 @@ handle_cast({channel_internal_error, Pid, Reason}, State) ->
[self(), Pid, Reason]),
internal_error(State);
handle_cast({server_misbehaved, AmqpError}, State) ->
- server_misbehaved_close(AmqpError, State).
+ server_misbehaved_close(AmqpError, State);
+handle_cast({server_close, #'connection.close'{} = Close}, State) ->
+ server_initiated_close(Close, State).
handle_info(Info, State) ->
callback(handle_message, [Info], State).
View
2  src/amqp_gen_consumer.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
%% @doc A behaviour module for implementing consumers for
View
61 src/amqp_main_reader.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% @private
@@ -60,35 +60,33 @@ handle_call(Call, From, State) ->
handle_cast(Cast, State) ->
{stop, {unexpected_cast, Cast}, State}.
-handle_info({inet_async, _, _, _} = InetAsync, State) ->
- handle_inet_async(InetAsync, State).
+handle_info({inet_async, Sock, _, {ok, <<Type:8, Channel:16, Length:32>>}},
+ State = #state{sock = Sock, message = none}) ->
+ {ok, _Ref} = rabbit_net:async_recv(Sock, Length + 1, infinity),
+ {noreply, State#state{message = {Type, Channel, Length}}};
+handle_info({inet_async, Sock, _, {ok, Data}},
+ State = #state{sock = Sock, message = {Type, Channel, L}}) ->
+ <<Payload:L/binary, ?FRAME_END>> = Data,
+ State1 = process_frame(Type, Channel, Payload, State),
+ {ok, _Ref} = rabbit_net:async_recv(Sock, 7, infinity),
+ {noreply, State1#state{message = none}};
+handle_info({inet_async, Sock, _, {error, closed}},
+ State = #state{sock = Sock, connection = Conn}) ->
+ Conn ! socket_closed,
+ {noreply, State};
+handle_info({inet_async, Sock, _, {error, Reason}},
+ State = #state{sock = Sock, connection = Conn}) ->
+ Conn ! {socket_error, Reason},
+ {stop, {socket_error, Reason}, State}.
%%---------------------------------------------------------------------------
%% Internal plumbing
%%---------------------------------------------------------------------------
-handle_inet_async({inet_async, Sock, _, Msg},
- State = #state{sock = Sock, message = CurMessage}) ->
- {Type, Number, Length} = case CurMessage of {T, N, L} -> {T, N, L};
- none -> {none, none, none}
- end,
- case Msg of
- {ok, <<Payload:Length/binary, ?FRAME_END>>} ->
- State1 = process_frame(Type, Number, Payload, State),
- {ok, _Ref} = rabbit_net:async_recv(Sock, 7, infinity),
- {noreply, State1#state{message = none}};
- {ok, <<NewType:8, NewChannel:16, NewLength:32>>} ->
- {ok, _Ref} = rabbit_net:async_recv(Sock, NewLength + 1, infinity),
- {noreply, State#state{message={NewType, NewChannel, NewLength}}};
- {error, closed} ->
- State#state.connection ! socket_closed,
- {noreply, State};
- {error, Reason} ->
- State#state.connection ! {socket_error, Reason},
- {stop, {socket_error, Reason}, State}
- end.
-
-process_frame(Type, ChNumber, Payload, State = #state{connection = Connection}) ->
+process_frame(Type, ChNumber, Payload,
+ State = #state{connection = Connection,
+ channels_manager = ChMgr,
+ astate = AState}) ->
case rabbit_command_assembler:analyze_frame(Type, Payload, ?PROTOCOL) of
heartbeat when ChNumber /= 0 ->
amqp_gen_connection:server_misbehaved(
@@ -99,13 +97,10 @@ process_frame(Type, ChNumber, Payload, State = #state{connection = Connection})
%% Match heartbeats but don't do anything with them
heartbeat ->
State;
+ AnalyzedFrame when ChNumber /= 0 ->
+ amqp_channels_manager:pass_frame(ChMgr, ChNumber, AnalyzedFrame),
+ State;
AnalyzedFrame ->
- pass_frame(ChNumber, AnalyzedFrame, State)
+ State#state{astate = amqp_channels_manager:process_channel_frame(
+ AnalyzedFrame, 0, Connection, AState)}
end.
-
-pass_frame(0, Frame, State = #state{connection = Conn, astate = AState}) ->
- State#state{astate = rabbit_reader:process_channel_frame(Frame, Conn,
- 0, Conn, AState)};
-pass_frame(Number, Frame, State = #state{channels_manager = ChMgr}) ->
- amqp_channels_manager:pass_frame(ChMgr, Number, Frame),
- State.
View
6 src/amqp_network_connection.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% @private
@@ -245,7 +245,7 @@ client_properties(UserProperties) ->
{<<"version">>, longstr, list_to_binary(Vsn)},
{<<"platform">>, longstr, <<"Erlang">>},
{<<"copyright">>, longstr,
- <<"Copyright (c) 2007-2011 VMware, Inc.">>},
+ <<"Copyright (c) 2007-2012 VMware, Inc.">>},
{<<"information">>, longstr,
<<"Licensed under the MPL. "
"See http://www.rabbitmq.com/">>},
@@ -256,7 +256,7 @@ client_properties(UserProperties) ->
handshake_recv(Expecting) ->
receive
- {'$gen_cast', {method, Method, none}} ->
+ {'$gen_cast', {method, Method, none, noflow}} ->
case {Expecting, element(1, Method)} of
{E, M} when E =:= M ->
Method;
View
2  src/amqp_rpc_client.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% @doc This module allows the simple execution of an asynchronous RPC over
View
2  src/amqp_rpc_server.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% @doc This is a utility module that is used to expose an arbitrary function
View
2  src/amqp_selective_consumer.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2011-2012 VMware, Inc. All rights reserved.
%%
%% @doc This module is an implementation of the amqp_gen_consumer
View
2  src/amqp_sup.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
%% @private
View
2  src/amqp_uri.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
%%
-module(amqp_uri).
View
2  src/uri_parser.erl
@@ -1,7 +1,7 @@
%% This file is a copy of http_uri.erl from the R13B-1 Erlang/OTP
%% distribution with several modifications.
-%% All modifications are Copyright (c) 2009-2011 VMware, Ltd.
+%% All modifications are Copyright (c) 2009-2012 VMware, Ltd.
%% ``The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
Please sign in to comment.
Something went wrong with that request. Please try again.