Permalink
Browse files

Updated to v2.3.0

  • Loading branch information...
jbrisbin committed Feb 2, 2011
1 parent 06c5933 commit 492a9743c9aee680afe4d06cb517b02c79e2c167
View
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
-include_lib("rabbit_common/include/rabbit.hrl").
@@ -31,6 +31,7 @@
virtual_host = <<"/">>,
host = "localhost",
port = ?PROTOCOL_PORT,
+ node = node(),
channel_max = 0,
frame_max = 0,
heartbeat = 0,
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
-module(amqp_auth_mechanisms).
View
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
%% @doc This module encapsulates the client's view of an AMQP
@@ -33,7 +33,7 @@
-export([close/1, close/3]).
-export([register_return_handler/2]).
-export([register_flow_handler/2]).
--export([register_ack_handler/2]).
+-export([register_confirm_handler/2]).
-export([next_publish_seqno/1]).
-export([register_default_consumer/2]).
@@ -49,7 +49,7 @@
closing = false,
writer,
return_handler_pid = none,
- ack_handler_pid = none,
+ confirm_handler_pid = none,
next_pub_seqno = 0,
flow_active = true,
flow_handler_pid = none,
@@ -184,14 +184,16 @@ subscribe(Channel, BasicConsume = #'basic.consume'{}, Consumer) ->
register_return_handler(Channel, ReturnHandler) ->
gen_server:cast(Channel, {register_return_handler, ReturnHandler} ).
-%% @spec (Channel, AckHandler) -> ok
+%% @spec (Channel, ConfirmHandler) -> ok
%% where
%% Channel = pid()
-%% AckHandler = pid()
-%% @doc This registers a handler to deal with ack'd messages. The
-%% registered process will receive #basic.ack{} commands.
-register_ack_handler(Channel, AckHandler) ->
- gen_server:cast(Channel, {register_ack_handler, AckHandler} ).
+%% ConfirmHandler = pid()
+
+%% @doc This registers a handler to deal with confirm-related
+%% messages. The registered process will receive #basic.ack{} and
+%% #basic.nack{} commands.
+register_confirm_handler(Channel, ConfirmHandler) ->
+ gen_server:cast(Channel, {register_confirm_handler, ConfirmHandler} ).
%% @spec (Channel, FlowHandler) -> ok
%% where
@@ -294,11 +296,11 @@ handle_cast({cast, Method, AmqpMsg}, State) ->
handle_cast({register_return_handler, ReturnHandler}, State) ->
erlang:monitor(process, ReturnHandler),
{noreply, State#state{return_handler_pid = ReturnHandler}};
-%% Registers a handler to process ack messages
+%% Registers a handler to process ack and nack messages
%% @private
-handle_cast({register_ack_handler, AckHandler}, State) ->
- erlang:monitor(process, AckHandler),
- {noreply, State#state{ack_handler_pid = AckHandler}};
+handle_cast({register_confirm_handler, ConfirmHandler}, State) ->
+ erlang:monitor(process, ConfirmHandler),
+ {noreply, State#state{confirm_handler_pid = ConfirmHandler}};
%% Registers a handler to process flow control messages
%% @private
handle_cast({register_flow_handler, FlowHandler}, State) ->
@@ -361,11 +363,11 @@ handle_info({'DOWN', _, process, ReturnHandler, Reason},
"Reason: ~p~n", [self(), ReturnHandler, Reason]),
{noreply, State#state{return_handler_pid = none}};
%% @private
-handle_info({'DOWN', _, process, AckHandler, Reason},
- State = #state{ack_handler_pid = AckHandler}) ->
- ?LOG_WARN("Channel (~p): Unregistering ack handler ~p because it died. "
- "Reason: ~p~n", [self(), AckHandler, Reason]),
- {noreply, State#state{ack_handler_pid = none}};
+handle_info({'DOWN', _, process, ConfirmHandler, Reason},
+ State = #state{confirm_handler_pid = ConfirmHandler}) ->
+ ?LOG_WARN("Channel (~p): Unregistering confirm handler ~p because it died. "
+ "Reason: ~p~n", [self(), ConfirmHandler, Reason]),
+ {noreply, State#state{confirm_handler_pid = none}};
%% @private
handle_info({'DOWN', _, process, FlowHandler, Reason},
State = #state{flow_handler_pid = FlowHandler}) ->
@@ -439,20 +441,29 @@ handle_close(Code, Text, From, State) ->
BlockReply -> {reply, BlockReply, State}
end.
-handle_subscribe(#'basic.consume'{consumer_tag = Tag} = Method, Consumer,
+handle_subscribe(#'basic.consume'{consumer_tag = Tag, nowait = NoWait} = Method,
+ Consumer,
From, State = #state{tagged_sub_requests = Tagged,
anon_sub_requests = Anon,
consumers = Consumers}) ->
case check_block(Method, none, State) of
ok when Tag =:= undefined orelse size(Tag) == 0 ->
- NewMethod = Method#'basic.consume'{consumer_tag = <<"">>},
- NewState = State#state{anon_sub_requests =
- queue:in(Consumer, Anon)},
- {noreply, rpc_top_half(NewMethod, none, From, NewState)};
- ok when is_binary(Tag) ->
+ case NoWait of
+ true ->
+ {reply, {error, command_invalid}, State};
+ false ->
+ NewMethod = Method#'basic.consume'{consumer_tag = <<"">>},
+ NewState = State#state{anon_sub_requests =
+ queue:in(Consumer, Anon)},
+ {noreply, rpc_top_half(NewMethod, none, From, NewState)}
+ end;
+ ok when is_binary(Tag) andalso size(Tag) >= 0 ->
case dict:is_key(Tag, Tagged) orelse dict:is_key(Tag, Consumers) of
true ->
{reply, {error, consumer_tag_already_in_use}, State};
+ false when NoWait ->
+ NewState = register_consumer(Tag, Consumer, State),
+ {reply, ok, rpc_top_half(Method, none, none, NewState)};
false ->
NewState = State#state{tagged_sub_requests =
dict:store(Tag, Consumer, Tagged)},
@@ -532,7 +543,7 @@ handle_method_from_server(Method, Content, State = #state{closing = Closing}) ->
if Drop -> ?LOG_INFO("Channel (~p): dropping method ~p from "
"server because channel is closing~n",
[self(), {Method, Content}]),
- {noreply, State};
+ {noreply, State};
true -> handle_method_from_server1(Method,
amqp_msg(Content), State)
end
@@ -596,14 +607,24 @@ handle_method_from_server1(
end,
{noreply, State};
handle_method_from_server1(#'basic.ack'{} = BasicAck, none,
- #state{ack_handler_pid = none} = State) ->
+ #state{confirm_handler_pid = none} = State) ->
?LOG_WARN("Channel (~p): received ~p but there is no "
- "ack handler registered~n", [self(), BasicAck]),
+ "confirm handler registered~n", [self(), BasicAck]),
{noreply, State};
handle_method_from_server1(#'basic.ack'{} = BasicAck, none,
- #state{ack_handler_pid = AckHandler} = State) ->
- AckHandler ! BasicAck,
+ #state{confirm_handler_pid = ConfirmHandler} = State) ->
+ ConfirmHandler ! BasicAck,
{noreply, State};
+handle_method_from_server1(#'basic.nack'{} = BasicNack, none,
+ #state{confirm_handler_pid = none} = State) ->
+ ?LOG_WARN("Channel (~p): received ~p but there is no "
+ "confirm handler registered~n", [self(), BasicNack]),
+ {noreply, State};
+handle_method_from_server1(#'basic.nack'{} = BasicNack, none,
+ #state{confirm_handler_pid = ConfirmHandler} = State) ->
+ ConfirmHandler ! BasicNack,
+ {noreply, State};
+
handle_method_from_server1(Method, none, State) ->
{noreply, rpc_bottom_half(Method, State)};
handle_method_from_server1(Method, Content, State) ->
View
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
%% @private
@@ -43,40 +43,27 @@ start_link(Type, InfraArgs, ChNumber) ->
%% Internal plumbing
%%---------------------------------------------------------------------------
-start_writer_fun(Sup, direct, [User, VHost, Collector], ChNumber) ->
- fun() ->
- ChPid = self(),
- {ok, _} = supervisor2:start_child(
- Sup,
- {rabbit_channel, {rabbit_channel, start_link,
- [ChNumber, ChPid, ChPid, User, VHost,
- Collector, start_limiter_fun(Sup)]},
- transient, ?MAX_WAIT, worker, [rabbit_channel]})
+start_writer_fun(_Sup, direct, [Node, User, VHost, Collector], ChNumber) ->
+ fun () ->
+ {ok, RabbitCh} =
+ rpc:call(Node, rabbit_direct, start_channel,
+ [ChNumber, self(), User, VHost, Collector]),
+ link(RabbitCh),
+ {ok, RabbitCh}
end;
start_writer_fun(Sup, network, [Sock], ChNumber) ->
- fun() ->
- ChPid = self(),
- {ok, _} = supervisor2:start_child(
- Sup,
- {writer, {rabbit_writer, start_link,
- [Sock, ChNumber, ?FRAME_MIN_SIZE, ?PROTOCOL,
- ChPid]},
- transient, ?MAX_WAIT, worker, [rabbit_writer]})
+ fun () ->
+ {ok, _} = supervisor2:start_child(
+ Sup,
+ {writer, {rabbit_writer, start_link,
+ [Sock, ChNumber, ?FRAME_MIN_SIZE, ?PROTOCOL,
+ self()]},
+ transient, ?MAX_WAIT, worker, [rabbit_writer]})
end.
init_command_assembler(direct) -> {ok, none};
init_command_assembler(network) -> rabbit_command_assembler:init(?PROTOCOL).
-start_limiter_fun(Sup) ->
- fun (UnackedCount) ->
- Parent = self(),
- {ok, _} = supervisor2:start_child(
- Sup,
- {limiter, {rabbit_limiter, start_link,
- [Parent, UnackedCount]},
- transient, ?MAX_WAIT, worker, [rabbit_limiter]})
- end.
-
%%---------------------------------------------------------------------------
%% supervisor2 callbacks
%%---------------------------------------------------------------------------
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
%% @private
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
%% @private
View
@@ -1,6 +1,6 @@
{application, amqp_client,
[{description, "RabbitMQ AMQP Client"},
- {vsn, "2.2.0"},
+ {vsn, "2.3.0"},
{modules, []},
{registered, [amqp_sup]},
{env, []},
View
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
%% @private
View
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
%% @doc This module is responsible for maintaining a connection to an AMQP
@@ -46,17 +46,18 @@
%% <li>virtual_host :: binary() - The name of a virtual host in the broker,
%% defaults to &lt;&lt;"/"&gt;&gt;</li>
%% <li>host :: string() - The hostname of the broker,
-%% defaults to "localhost"</li>
+%% defaults to "localhost" (network only)</li>
%% <li>port :: integer() - The port the broker is listening on,
-%% defaults to 5672</li>
+%% defaults to 5672 (network only)</li>
+%% <li>node :: atom() - The node the broker runs on (direct only)</li>
%% <li>channel_max :: non_neg_integer() - The channel_max handshake parameter,
%% defaults to 0</li>
%% <li>frame_max :: non_neg_integer() - The frame_max handshake parameter,
-%% defaults to 0</li>
+%% defaults to 0 (network only)</li>
%% <li>heartbeat :: non_neg_integer() - The hearbeat interval in seconds,
-%% defaults to 0 (turned off)</li>
+%% defaults to 0 (turned off) (network only)</li>
%% <li>ssl_options :: term() - The second parameter to be used with the
-%% ssl:connect/2 function, defaults to 'none'</li>
+%% ssl:connect/2 function, defaults to 'none' (network only)</li>
%% <li>client_properties :: [{binary(), atom(), binary()}] - A list of extra
%% client properties to be sent to the server, defaults to []</li>
%% </ul>
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
%% @private
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
%%
%% @private
Oops, something went wrong.

0 comments on commit 492a974

Please sign in to comment.