Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Updated to RabbitMQ v3.0.0

  • Loading branch information...
commit 7b2e9a5062eee8faea83f076d951edac9b26798f 1 parent be088cd
@jbrisbin authored
View
4 README.md
@@ -5,7 +5,7 @@ This is a fork of the [official RabbitMQ/AMQP Erlang client](https://github.com/
It's meant to be included in your rebar projects in your rebar.config file:
{deps, [
- {amqp_client, ".*", {git, "git://github.com/jbrisbin/amqp_client.git", {tag, "rabbitmq-2.8.2"}}}
+ {amqp_client, ".*", {git, "git://github.com/jbrisbin/amqp_client.git", {tag, "rabbitmq-3.0.0"}}}
]}.
The "master" branch of this port is a simple re-packaging of the rabbit_common AMQP client dependency.
@@ -16,7 +16,7 @@ To use the "community" branch in your project, which includes stricter compilati
to the version tag:
{deps, [
- {amqp_client, ".*", {git, "git://github.com/jbrisbin/amqp_client.git", {tag, "rabbitmq-2.8.2-community"}}}
+ {amqp_client, ".*", {git, "git://github.com/jbrisbin/amqp_client.git", {tag, "rabbitmq-3.0.0-community"}}}
]}.
### License
View
8 amqp_client.sublime-project
@@ -1,8 +0,0 @@
-{
- "folders":
- [
- {
- "path": "/Users/jbrisbin/src/rabbitmq/amqp_client"
- }
- ]
-}
View
138 amqp_client.sublime-workspace
@@ -1,138 +0,0 @@
-{
- "auto_complete":
- {
- "selected_items":
- [
- ]
- },
- "buffers":
- [
- ],
- "build_system": "",
- "command_palette":
- {
- "height": 327.0,
- "selected_items":
- [
- [
- "Packa",
- "Package Control: Install Package"
- ]
- ],
- "width": 449.0
- },
- "console":
- {
- "height": 229.0
- },
- "distraction_free":
- {
- "menu_visible": true,
- "show_minimap": false,
- "show_open_files": false,
- "show_tabs": false,
- "side_bar_visible": false,
- "status_bar_visible": false
- },
- "file_history":
- [
- "/Users/jbrisbin/Library/Application Support/Sublime Text 2/Packages/Default/Preferences.sublime-settings",
- "/Users/jbrisbin/Library/Application Support/Sublime Text 2/Packages/User/Preferences.sublime-settings"
- ],
- "find":
- {
- "height": 0.0
- },
- "find_in_files":
- {
- "height": 0.0,
- "where_history":
- [
- ]
- },
- "find_state":
- {
- "case_sensitive": false,
- "find_history":
- [
- ],
- "highlight": true,
- "in_selection": false,
- "preserve_case": false,
- "regex": false,
- "replace_history":
- [
- ],
- "reverse": false,
- "show_context": true,
- "use_buffer2": true,
- "whole_word": false,
- "wrap": true
- },
- "groups":
- [
- {
- "sheets":
- [
- ]
- }
- ],
- "incremental_find":
- {
- "height": 0.0
- },
- "input":
- {
- "height": 0.0
- },
- "layout":
- {
- "cells":
- [
- [
- 0,
- 0,
- 1,
- 1
- ]
- ],
- "cols":
- [
- 0.0,
- 1.0
- ],
- "rows":
- [
- 0.0,
- 1.0
- ]
- },
- "menu_visible": true,
- "replace":
- {
- "height": 0.0
- },
- "save_all_on_build": true,
- "select_file":
- {
- "height": 0.0,
- "selected_items":
- [
- ],
- "width": 0.0
- },
- "select_project":
- {
- "height": 0.0,
- "selected_items":
- [
- ],
- "width": 0.0
- },
- "show_minimap": true,
- "show_open_files": true,
- "show_tabs": true,
- "side_bar_visible": true,
- "side_bar_width": 326.0,
- "status_bar_visible": true
-}
View
34 include/amqp_client.hrl
@@ -20,16 +20,6 @@
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl").
--define(PROTOCOL_VERSION_MAJOR, 0).
--define(PROTOCOL_VERSION_MINOR, 9).
--define(PROTOCOL_HEADER, <<"AMQP", 0, 0, 9, 1>>).
--define(PROTOCOL, rabbit_framing_amqp_0_9_1).
-
--define(MAX_CHANNEL_NUMBER, 65535).
--define(DEFAULT_CONSUMER, {amqp_selective_consumer, []}).
-
--define(PROTOCOL_SSL_PORT, (?PROTOCOL_PORT - 1)).
-
-record(amqp_msg, {props = #'P_basic'{}, payload = <<>>}).
-record(amqp_params_network, {username = <<"guest">>,
@@ -49,26 +39,18 @@
socket_options = []}).
-record(amqp_params_direct, {username = <<"guest">>,
+ password = none,
virtual_host = <<"/">>,
node = node(),
adapter_info = none,
client_properties = []}).
--record(adapter_info, {address = unknown,
- port = unknown,
- peer_address = unknown,
- peer_port = unknown,
- name = unknown,
- protocol = unknown,
- additional_info = []}).
-
--define(LOG_DEBUG(Format), error_logger:info_msg(Format)).
--define(LOG_INFO(Format, Args), error_logger:info_msg(Format, Args)).
--define(LOG_WARN(Format, Args), error_logger:warning_msg(Format, Args)).
-
--define(CLIENT_CAPABILITIES, [{<<"publisher_confirms">>, bool, true},
- {<<"exchange_exchange_bindings">>, bool, true},
- {<<"basic.nack">>, bool, true},
- {<<"consumer_cancel_notify">>, bool, true}]).
+-record(amqp_adapter_info, {address = unknown,
+ port = unknown,
+ peer_address = unknown,
+ peer_port = unknown,
+ name = unknown,
+ protocol = unknown,
+ additional_info = []}).
-endif.
View
34 include/amqp_client_internal.hrl
@@ -0,0 +1,34 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-include("amqp_client.hrl").
+
+-define(PROTOCOL_VERSION_MAJOR, 0).
+-define(PROTOCOL_VERSION_MINOR, 9).
+-define(PROTOCOL_HEADER, <<"AMQP", 0, 0, 9, 1>>).
+-define(PROTOCOL, rabbit_framing_amqp_0_9_1).
+
+-define(MAX_CHANNEL_NUMBER, 65535).
+
+-define(LOG_DEBUG(Format), error_logger:info_msg(Format)).
+-define(LOG_INFO(Format, Args), error_logger:info_msg(Format, Args)).
+-define(LOG_WARN(Format, Args), error_logger:warning_msg(Format, Args)).
+-define(LOG_ERR(Format, Args), error_logger:error_msg(Format, Args)).
+
+-define(CLIENT_CAPABILITIES, [{<<"publisher_confirms">>, bool, true},
+ {<<"exchange_exchange_bindings">>, bool, true},
+ {<<"basic.nack">>, bool, true},
+ {<<"consumer_cancel_notify">>, bool, true}]).
View
BIN  rebar
Binary file not shown
View
2  rebar.config
@@ -1,5 +1,5 @@
{deps, [
- {rabbit_common, ".*", {git, "git://github.com/jbrisbin/rabbit_common.git", {tag, "rabbitmq-2.8.2"}}}
+ {rabbit_common, ".*", {git, "git://github.com/jbrisbin/rabbit_common.git", {tag, "rabbitmq-3.0.0"}}}
]}.
{erl_opts, [
View
169 src/amqp_channel.erl
@@ -62,14 +62,15 @@
%% See type definitions below.
-module(amqp_channel).
--include("amqp_client.hrl").
+-include("amqp_client_internal.hrl").
-behaviour(gen_server).
-export([call/2, call/3, cast/2, cast/3, cast_flow/3]).
-export([close/1, close/3]).
--export([register_return_handler/2, register_flow_handler/2,
- register_confirm_handler/2]).
+-export([register_return_handler/2, unregister_return_handler/1,
+ register_flow_handler/2, unregister_flow_handler/1,
+ register_confirm_handler/2, unregister_confirm_handler/1]).
-export([call_consumer/2, subscribe/3]).
-export([next_publish_seqno/1, wait_for_confirms/1, wait_for_confirms/2,
wait_for_confirms_or_die/1, wait_for_confirms_or_die/2]).
@@ -84,20 +85,20 @@
connection,
consumer,
driver,
- rpc_requests = queue:new(),
- closing = false, %% false |
- %% {just_channel, Reason} |
- %% {connection, Reason}
+ rpc_requests = queue:new(),
+ closing = false, %% false |
+ %% {just_channel, Reason} |
+ %% {connection, Reason}
writer,
- return_handler_pid = none,
- confirm_handler_pid = none,
- next_pub_seqno = 0,
- flow_active = true,
- flow_handler_pid = none,
+ return_handler = none,
+ confirm_handler = none,
+ next_pub_seqno = 0,
+ flow_active = true,
+ flow_handler = none,
start_writer_fun,
- unconfirmed_set = gb_sets:new(),
- waiting_set = gb_trees:empty(),
- only_acks_received = true
+ unconfirmed_set = gb_sets:new(),
+ waiting_set = gb_trees:empty(),
+ only_acks_received = true
}).
%%---------------------------------------------------------------------------
@@ -263,6 +264,14 @@ wait_for_confirms_or_die(Channel, Timeout) ->
register_return_handler(Channel, ReturnHandler) ->
gen_server:cast(Channel, {register_return_handler, ReturnHandler} ).
+%% @spec (Channel) -> ok
+%% where
+%% Channel = pid()
+%% @doc Removes the return handler, if it exists. Does nothing if there is no
+%% such handler.
+unregister_return_handler(Channel) ->
+ gen_server:cast(Channel, unregister_return_handler).
+
%% @spec (Channel, ConfirmHandler) -> ok
%% where
%% Channel = pid()
@@ -274,6 +283,14 @@ register_return_handler(Channel, ReturnHandler) ->
register_confirm_handler(Channel, ConfirmHandler) ->
gen_server:cast(Channel, {register_confirm_handler, ConfirmHandler} ).
+%% @spec (Channel) -> ok
+%% where
+%% Channel = pid()
+%% @doc Removes the confirm handler, if it exists. Does nothing if there is no
+%% such handler.
+unregister_confirm_handler(Channel) ->
+ gen_server:cast(Channel, unregister_confirm_handler).
+
%% @spec (Channel, FlowHandler) -> ok
%% where
%% Channel = pid()
@@ -283,6 +300,14 @@ register_confirm_handler(Channel, ConfirmHandler) ->
register_flow_handler(Channel, FlowHandler) ->
gen_server:cast(Channel, {register_flow_handler, FlowHandler} ).
+%% @spec (Channel) -> ok
+%% where
+%% Channel = pid()
+%% @doc Removes the flow handler, if it exists. Does nothing if there is no
+%% such handler.
+unregister_flow_handler(Channel) ->
+ gen_server:cast(Channel, unregister_flow_handler).
+
%% @spec (Channel, Msg) -> ok
%% where
%% Channel = pid()
@@ -376,16 +401,31 @@ handle_cast({cast, Method, AmqpMsg, Sender, flow}, State) ->
handle_method_to_server(Method, AmqpMsg, none, Sender, flow, State);
%% @private
handle_cast({register_return_handler, ReturnHandler}, State) ->
- erlang:monitor(process, ReturnHandler),
- {noreply, State#state{return_handler_pid = ReturnHandler}};
+ Ref = erlang:monitor(process, ReturnHandler),
+ {noreply, State#state{return_handler = {ReturnHandler, Ref}}};
+%% @private
+handle_cast(unregister_return_handler,
+ State = #state{return_handler = {_ReturnHandler, Ref}}) ->
+ erlang:demonitor(Ref),
+ {noreply, State#state{return_handler = none}};
%% @private
handle_cast({register_confirm_handler, ConfirmHandler}, State) ->
- erlang:monitor(process, ConfirmHandler),
- {noreply, State#state{confirm_handler_pid = ConfirmHandler}};
+ Ref = erlang:monitor(process, ConfirmHandler),
+ {noreply, State#state{confirm_handler = {ConfirmHandler, Ref}}};
+%% @private
+handle_cast(unregister_confirm_handler,
+ State = #state{confirm_handler = {_ConfirmHandler, Ref}}) ->
+ erlang:demonitor(Ref),
+ {noreply, State#state{confirm_handler = none}};
%% @private
handle_cast({register_flow_handler, FlowHandler}, State) ->
- erlang:monitor(process, FlowHandler),
- {noreply, State#state{flow_handler_pid = FlowHandler}};
+ Ref = erlang:monitor(process, FlowHandler),
+ {noreply, State#state{flow_handler = {FlowHandler, Ref}}};
+%% @private
+handle_cast(unregister_flow_handler,
+ State = #state{flow_handler = {_FlowHandler, Ref}}) ->
+ erlang:demonitor(Ref),
+ {noreply, State#state{flow_handler = none}};
%% Received from channels manager
%% @private
handle_cast({method, Method, Content, noflow}, State) ->
@@ -433,22 +473,22 @@ handle_info(timed_out_flushing_channel, State) ->
{stop, timed_out_flushing_channel, State};
%% @private
handle_info({'DOWN', _, process, ReturnHandler, Reason},
- State = #state{return_handler_pid = ReturnHandler}) ->
+ State = #state{return_handler = {ReturnHandler, _Ref}}) ->
?LOG_WARN("Channel (~p): Unregistering return handler ~p because it died. "
"Reason: ~p~n", [self(), ReturnHandler, Reason]),
- {noreply, State#state{return_handler_pid = none}};
+ {noreply, State#state{return_handler = none}};
%% @private
handle_info({'DOWN', _, process, ConfirmHandler, Reason},
- State = #state{confirm_handler_pid = ConfirmHandler}) ->
+ State = #state{confirm_handler = {ConfirmHandler, _Ref}}) ->
?LOG_WARN("Channel (~p): Unregistering confirm handler ~p because it died. "
"Reason: ~p~n", [self(), ConfirmHandler, Reason]),
- {noreply, State#state{confirm_handler_pid = none}};
+ {noreply, State#state{confirm_handler = none}};
%% @private
handle_info({'DOWN', _, process, FlowHandler, Reason},
- State = #state{flow_handler_pid = FlowHandler}) ->
+ State = #state{flow_handler = {FlowHandler, _Ref}}) ->
?LOG_WARN("Channel (~p): Unregistering flow handler ~p because it died. "
"Reason: ~p~n", [self(), FlowHandler, Reason]),
- {noreply, State#state{flow_handler_pid = none}};
+ {noreply, State#state{flow_handler = none}};
handle_info({'DOWN', _, process, QPid, _Reason}, State) ->
rabbit_amqqueue:notify_sent_queue_down(QPid),
{noreply, State};
@@ -648,9 +688,9 @@ handle_method_from_server1(#'basic.deliver'{} = Deliver, AmqpMsg, State) ->
ok = call_to_consumer(Deliver, AmqpMsg, State),
{noreply, State};
handle_method_from_server1(#'channel.flow'{active = Active} = Flow, none,
- State = #state{flow_handler_pid = FlowHandler}) ->
- case FlowHandler of none -> ok;
- _ -> FlowHandler ! Flow
+ State = #state{flow_handler = FlowHandler}) ->
+ case FlowHandler of none -> ok;
+ {Pid, _Ref} -> Pid ! Flow
end,
%% Putting the flow_ok in the queue so that the RPC queue can be
%% flushed beforehand. Methods that made it to the queue are not
@@ -659,32 +699,32 @@ handle_method_from_server1(#'channel.flow'{active = Active} = Flow, none,
none, noflow, State#state{flow_active = Active})};
handle_method_from_server1(
#'basic.return'{} = BasicReturn, AmqpMsg,
- State = #state{return_handler_pid = ReturnHandler}) ->
+ State = #state{return_handler = ReturnHandler}) ->
case ReturnHandler of
- none -> ?LOG_WARN("Channel (~p): received {~p, ~p} but there is no "
- "return handler registered~n",
- [self(), BasicReturn, AmqpMsg]);
- _ -> ReturnHandler ! {BasicReturn, AmqpMsg}
+ none -> ?LOG_WARN("Channel (~p): received {~p, ~p} but there is "
+ "no return handler registered~n",
+ [self(), BasicReturn, AmqpMsg]);
+ {Pid, _Ref} -> Pid ! {BasicReturn, AmqpMsg}
end,
{noreply, State};
handle_method_from_server1(#'basic.ack'{} = BasicAck, none,
- #state{confirm_handler_pid = none} = State) ->
+ #state{confirm_handler = none} = State) ->
?LOG_WARN("Channel (~p): received ~p but there is no "
"confirm handler registered~n", [self(), BasicAck]),
{noreply, update_confirm_set(BasicAck, State)};
handle_method_from_server1(
#'basic.ack'{} = BasicAck, none,
- #state{confirm_handler_pid = ConfirmHandler} = State) ->
+ #state{confirm_handler = {ConfirmHandler, _Ref}} = State) ->
ConfirmHandler ! BasicAck,
{noreply, update_confirm_set(BasicAck, State)};
handle_method_from_server1(#'basic.nack'{} = BasicNack, none,
- #state{confirm_handler_pid = none} = State) ->
+ #state{confirm_handler = none} = State) ->
?LOG_WARN("Channel (~p): received ~p but there is no "
"confirm handler registered~n", [self(), BasicNack]),
{noreply, update_confirm_set(BasicNack, State)};
handle_method_from_server1(
#'basic.nack'{} = BasicNack, none,
- #state{confirm_handler_pid = ConfirmHandler} = State) ->
+ #state{confirm_handler = {ConfirmHandler, _Ref}} = State) ->
ConfirmHandler ! BasicNack,
{noreply, update_confirm_set(BasicNack, State)};
@@ -712,25 +752,19 @@ handle_connection_closing(CloseType, Reason,
handle_shutdown({connection_closing, Reason}, NewState)
end.
-handle_channel_exit(Reason, State = #state{connection = Connection}) ->
- case Reason of
- %% Sent by rabbit_channel in the direct case
- #amqp_error{name = ErrorName, explanation = Expl} ->
- ?LOG_WARN("Channel (~p) closing: server sent error ~p~n",
- [self(), Reason]),
- {IsHard, Code, _} = ?PROTOCOL:lookup_amqp_exception(ErrorName),
- ReportedReason = {server_initiated_close, Code, Expl},
- handle_shutdown(
- if IsHard ->
- amqp_gen_connection:hard_error_in_channel(
- Connection, self(), ReportedReason),
- {connection_closing, ReportedReason};
- true -> ReportedReason
- end, State);
- %% Unexpected death of a channel infrastructure process
- _ ->
- {stop, {infrastructure_died, Reason}, State}
- end.
+handle_channel_exit(Reason = #amqp_error{name = ErrorName, explanation = Expl},
+ State = #state{connection = Connection, number = Number}) ->
+ %% Sent by rabbit_channel for hard errors in the direct case
+ ?LOG_ERR("connection ~p, channel ~p - error:~n~p~n",
+ [Connection, Number, Reason]),
+ {true, Code, _} = ?PROTOCOL:lookup_amqp_exception(ErrorName),
+ ReportedReason = {server_initiated_close, Code, Expl},
+ amqp_gen_connection:hard_error_in_channel(
+ Connection, self(), ReportedReason),
+ handle_shutdown({connection_closing, ReportedReason}, State);
+handle_channel_exit(Reason, State) ->
+ %% Unexpected death of a channel infrastructure process
+ {stop, {infrastructure_died, Reason}, State}.
handle_shutdown({_, 200, _}, State) ->
{stop, normal, State};
@@ -854,16 +888,15 @@ handle_wait_for_confirms(From, Timeout,
State = #state{unconfirmed_set = USet,
waiting_set = WSet}) ->
case gb_sets:is_empty(USet) of
- true ->
- {reply, true, State};
- false ->
- TRef = case Timeout of
- infinity -> undefined;
- _ -> erlang:send_after(Timeout * 1000, self(),
- {confirm_timeout, From})
- end,
- {noreply,
- State#state{waiting_set = gb_trees:insert(From, TRef, WSet)}}
+ true -> {reply, true, State};
+ false -> TRef = case Timeout of
+ infinity -> undefined;
+ _ -> erlang:send_after(
+ Timeout * 1000, self(),
+ {confirm_timeout, From})
+ end,
+ {noreply,
+ State#state{waiting_set = gb_trees:insert(From, TRef, WSet)}}
end.
call_to_consumer(Method, Args, #state{consumer = Consumer}) ->
View
2  src/amqp_channel_sup.erl
@@ -17,7 +17,7 @@
%% @private
-module(amqp_channel_sup).
--include("amqp_client.hrl").
+-include("amqp_client_internal.hrl").
-behaviour(supervisor2).
View
4 src/amqp_channels_manager.erl
@@ -17,7 +17,7 @@
%% @private
-module(amqp_channels_manager).
--include("amqp_client.hrl").
+-include("amqp_client_internal.hrl").
-behaviour(gen_server).
@@ -84,7 +84,7 @@ terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
- State.
+ {ok, State}.
handle_call({open_channel, ProposedNumber, Consumer, InfraArgs}, _,
State = #state{closing = false}) ->
View
2  src/amqp_client.app.src
@@ -1,6 +1,6 @@
{application, amqp_client,
[{description, "RabbitMQ Erlang Client Library"},
- {vsn, "2.8.2"},
+ {vsn, "3.0.0"},
{modules, []},
{registered, []},
{env, []},
View
64 src/amqp_connection.erl
@@ -67,27 +67,32 @@
%% See type definitions below.
-module(amqp_connection).
--include("amqp_client.hrl").
+-include("amqp_client_internal.hrl").
-export([open_channel/1, open_channel/2, open_channel/3]).
--export([start/1]).
--export([close/1, close/3]).
+-export([start/1, close/1, close/2, close/3]).
+-export([error_atom/1]).
-export([info/2, info_keys/1, info_keys/0]).
+-define(DEFAULT_CONSUMER, {amqp_selective_consumer, []}).
+
+-define(PROTOCOL_SSL_PORT, (?PROTOCOL_PORT - 1)).
+
%%---------------------------------------------------------------------------
%% Type Definitions
%%---------------------------------------------------------------------------
-%% @type adapter_info() = #adapter_info{}.
+%% @type amqp_adapter_info() = #amqp_adapter_info{}.
%% @type amqp_params_direct() = #amqp_params_direct{}.
%% As defined in amqp_client.hrl. It contains the following fields:
%% <ul>
%% <li>username :: binary() - The name of a user registered with the broker,
%% defaults to &lt;&lt;guest"&gt;&gt;</li>
+%% <li>password :: binary() - The password of user, defaults to 'none'</li>
%% <li>virtual_host :: binary() - The name of a virtual host in the broker,
%% defaults to &lt;&lt;"/"&gt;&gt;</li>
%% <li>node :: atom() - The node the broker runs on (direct only)</li>
-%% <li>adapter_info :: adapter_info() - Extra management information for if
+%% <li>adapter_info :: amqp_adapter_info() - Extra management information for if
%% this connection represents a non-AMQP network connection.</li>
%% <li>client_properties :: [{binary(), atom(), binary()}] - A list of extra
%% client properties to be sent to the server, defaults to []</li>
@@ -163,9 +168,10 @@ start(AmqpParams) ->
%% Commands
%%---------------------------------------------------------------------------
-%% @doc Invokes open_channel(ConnectionPid, none, ?DEFAULT_CONSUMER).
-%% Opens a channel without having to specify a channel number. This uses the
-%% default consumer implementation.
+%% @doc Invokes open_channel(ConnectionPid, none,
+%% {amqp_selective_consumer, []}). Opens a channel without having to
+%% specify a channel number. This uses the default consumer
+%% implementation.
open_channel(ConnectionPid) ->
open_channel(ConnectionPid, none, ?DEFAULT_CONSUMER).
@@ -174,8 +180,9 @@ open_channel(ConnectionPid) ->
open_channel(ConnectionPid, {_, _} = Consumer) ->
open_channel(ConnectionPid, none, Consumer);
-%% @doc Invokes open_channel(ConnectionPid, ChannelNumber, ?DEFAULT_CONSUMER).
-%% Opens a channel, using the default consumer implementation.
+%% @doc Invokes open_channel(ConnectionPid, ChannelNumber,
+%% {amqp_selective_consumer, []}). Opens a channel, using the default
+%% consumer implementation.
open_channel(ConnectionPid, ChannelNumber)
when is_number(ChannelNumber) orelse ChannelNumber =:= none ->
open_channel(ConnectionPid, ChannelNumber, ?DEFAULT_CONSUMER).
@@ -196,9 +203,9 @@ open_channel(ConnectionPid, ChannelNumber)
%% is passed as parameter to ConsumerModule:init/1.<br/>
%% This function assumes that an AMQP connection (networked or direct)
%% has already been successfully established.<br/>
-%% ChannelNumber must be less than or equal to the negotiated max_channel value,
-%% or less than or equal to ?MAX_CHANNEL_NUMBER if the negotiated max_channel
-%% value is 0.<br/>
+%% ChannelNumber must be less than or equal to the negotiated
+%% max_channel value, or less than or equal to ?MAX_CHANNEL_NUMBER
+%% (65535) if the negotiated max_channel value is 0.<br/>
%% In the direct connection, max_channel is always 0.
open_channel(ConnectionPid, ChannelNumber,
{_ConsumerModule, _ConsumerArgs} = Consumer) ->
@@ -212,6 +219,14 @@ open_channel(ConnectionPid, ChannelNumber,
close(ConnectionPid) ->
close(ConnectionPid, 200, <<"Goodbye">>).
+%% @spec (ConnectionPid, Timeout) -> ok | Error
+%% where
+%% ConnectionPid = pid()
+%% Timeout = integer()
+%% @doc Closes the channel, using the supplied Timeout value.
+close(ConnectionPid, Timeout) ->
+ close(ConnectionPid, 200, <<"Goodbye">>, Timeout).
+
%% @spec (ConnectionPid, Code, Text) -> ok | closing
%% where
%% ConnectionPid = pid()
@@ -220,16 +235,35 @@ close(ConnectionPid) ->
%% @doc Closes the AMQP connection, allowing the caller to set the reply
%% code and text.
close(ConnectionPid, Code, Text) ->
- Close = #'connection.close'{reply_text = Text,
+ close(ConnectionPid, Code, Text, infinity).
+
+%% @spec (ConnectionPid, Code, Text, Timeout) -> ok | closing
+%% where
+%% ConnectionPid = pid()
+%% Code = integer()
+%% Text = binary()
+%% Timeout = integer()
+%% @doc Closes the AMQP connection, allowing the caller to set the reply
+%% code and text, as well as a timeout for the operation, after which the
+%% connection will be abruptly terminated.
+close(ConnectionPid, Code, Text, Timeout) ->
+ Close = #'connection.close'{reply_text = Text,
reply_code = Code,
class_id = 0,
method_id = 0},
- amqp_gen_connection:close(ConnectionPid, Close).
+ amqp_gen_connection:close(ConnectionPid, Close, Timeout).
%%---------------------------------------------------------------------------
%% Other functions
%%---------------------------------------------------------------------------
+%% @spec (Code) -> atom()
+%% where
+%% Code = integer()
+%% @doc Returns a descriptive atom corresponding to the given AMQP
+%% error code.
+error_atom(Code) -> ?PROTOCOL:amqp_exception(Code).
+
%% @spec (ConnectionPid, Items) -> ResultList
%% where
%% ConnectionPid = pid()
View
2  src/amqp_connection_sup.erl
@@ -70,7 +70,7 @@ start_infrastructure_fun(Sup, direct) ->
Sup,
{connection_type_sup, {amqp_connection_type_sup,
start_link_direct, []},
- transient, infinity, supervisor,
+ intrinsic, infinity, supervisor,
[amqp_connection_type_sup]}),
{ok, Collector}
end.
View
2  src/amqp_connection_type_sup.erl
@@ -17,7 +17,7 @@
%% @private
-module(amqp_connection_type_sup).
--include("amqp_client.hrl").
+-include("amqp_client_internal.hrl").
-behaviour(supervisor2).
View
43 src/amqp_direct_connection.erl
@@ -17,7 +17,7 @@
%% @private
-module(amqp_direct_connection).
--include("amqp_client.hrl").
+-include("amqp_client_internal.hrl").
-behaviour(amqp_gen_connection).
@@ -60,7 +60,7 @@ open_channel_args(#state{node = Node,
vhost = VHost,
adapter_info = Info,
collector = Collector}) ->
- [self(), Info#adapter_info.name, Node, User, VHost, Collector].
+ [self(), Info#amqp_adapter_info.name, Node, User, VHost, Collector].
do(_Method, _State) ->
ok.
@@ -69,7 +69,8 @@ handle_message(force_event_refresh, State = #state{node = Node}) ->
rpc:call(Node, rabbit_event, notify,
[connection_created, connection_info(State)]),
{ok, State};
-
+handle_message(closing_timeout, State = #state{closing_reason = Reason}) ->
+ {stop, {closing_timeout, Reason}, State};
handle_message(Msg, State) ->
{stop, {unexpected_msg, Msg}, State}.
@@ -88,17 +89,18 @@ terminate(_Reason, #state{node = Node}) ->
i(type, _State) -> direct;
i(pid, _State) -> self();
%% AMQP Params
-i(user, #state{params = P}) -> P#amqp_params_direct.username;
+i(user, #state{params=#amqp_params_direct{username=#user{username=U}}}) -> U;
+i(user, #state{params=#amqp_params_direct{username=U}}) -> U;
i(vhost, #state{params = P}) -> P#amqp_params_direct.virtual_host;
i(client_properties, #state{params = P}) ->
P#amqp_params_direct.client_properties;
%% Optional adapter info
-i(protocol, #state{adapter_info = I}) -> I#adapter_info.protocol;
-i(address, #state{adapter_info = I}) -> I#adapter_info.address;
-i(port, #state{adapter_info = I}) -> I#adapter_info.port;
-i(peer_address, #state{adapter_info = I}) -> I#adapter_info.peer_address;
-i(peer_port, #state{adapter_info = I}) -> I#adapter_info.peer_port;
-i(name, #state{adapter_info = I}) -> I#adapter_info.name;
+i(protocol, #state{adapter_info = I}) -> I#amqp_adapter_info.protocol;
+i(address, #state{adapter_info = I}) -> I#amqp_adapter_info.address;
+i(port, #state{adapter_info = I}) -> I#amqp_adapter_info.port;
+i(peer_address, #state{adapter_info = I}) -> I#amqp_adapter_info.peer_address;
+i(peer_port, #state{adapter_info = I}) -> I#amqp_adapter_info.peer_port;
+i(name, #state{adapter_info = I}) -> I#amqp_adapter_info.name;
i(Item, _State) -> throw({bad_argument, Item}).
@@ -109,9 +111,10 @@ infos(Items, State) ->
[{Item, i(Item, State)} || Item <- Items].
connection_info(State = #state{adapter_info = I}) ->
- infos(?CREATION_EVENT_KEYS, State) ++ I#adapter_info.additional_info.
+ infos(?CREATION_EVENT_KEYS, State) ++ I#amqp_adapter_info.additional_info.
connect(Params = #amqp_params_direct{username = Username,
+ password = Password,
node = Node,
adapter_info = Info,
virtual_host = VHost},
@@ -120,8 +123,12 @@ connect(Params = #amqp_params_direct{username = Username,
vhost = VHost,
params = Params,
adapter_info = ensure_adapter_info(Info)},
+ AuthToken = case Password of
+ none -> Username;
+ _ -> {Username, Password}
+ end,
case rpc:call(Node, rabbit_direct, connect,
- [Username, VHost, ?PROTOCOL, self(),
+ [AuthToken, VHost, ?PROTOCOL, self(),
connection_info(State1)]) of
{ok, {User, ServerProperties}} ->
{ok, Collector} = SIF(),
@@ -135,14 +142,14 @@ connect(Params = #amqp_params_direct{username = Username,
end.
ensure_adapter_info(none) ->
- ensure_adapter_info(#adapter_info{});
+ ensure_adapter_info(#amqp_adapter_info{});
-ensure_adapter_info(A = #adapter_info{protocol = unknown}) ->
- ensure_adapter_info(A#adapter_info{protocol =
- {'Direct', ?PROTOCOL:version()}});
+ensure_adapter_info(A = #amqp_adapter_info{protocol = unknown}) ->
+ ensure_adapter_info(A#amqp_adapter_info{
+ protocol = {'Direct', ?PROTOCOL:version()}});
-ensure_adapter_info(A = #adapter_info{name = unknown}) ->
+ensure_adapter_info(A = #amqp_adapter_info{name = unknown}) ->
Name = list_to_binary(rabbit_misc:pid_to_string(self())),
- ensure_adapter_info(A#adapter_info{name = Name});
+ ensure_adapter_info(A#amqp_adapter_info{name = Name});
ensure_adapter_info(Info) -> Info.
View
24 src/amqp_gen_connection.erl
@@ -17,13 +17,13 @@
%% @private
-module(amqp_gen_connection).
--include("amqp_client.hrl").
+-include("amqp_client_internal.hrl").
-behaviour(gen_server).
-export([start_link/5, connect/1, open_channel/3, hard_error_in_channel/3,
channel_internal_error/3, server_misbehaved/2, channels_terminated/1,
- close/2, server_close/2, info/2, info_keys/0, info_keys/1]).
+ close/3, server_close/2, info/2, info_keys/0, info_keys/1]).
-export([behaviour_info/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
@@ -80,8 +80,8 @@ server_misbehaved(Pid, AmqpError) ->
channels_terminated(Pid) ->
gen_server:cast(Pid, channels_terminated).
-close(Pid, Close) ->
- gen_server:call(Pid, {command, {close, Close}}, infinity).
+close(Pid, Close, Timeout) ->
+ gen_server:call(Pid, {command, {close, Close, Timeout}}, infinity).
server_close(Pid, Close) ->
gen_server:cast(Pid, {server_close, Close}).
@@ -154,6 +154,10 @@ callback(Function, Params, State = #state{module = Mod,
%%---------------------------------------------------------------------------
init([Mod, Sup, AmqpParams, SIF, SChMF, ExtraParams]) ->
+ %% Trapping exits since we need to make sure that the `terminate/2' is
+ %% called in the case of direct connection (it does not matter for a network
+ %% connection). See bug25116.
+ process_flag(trap_exit, true),
{ok, MState} = Mod:init(ExtraParams),
{ok, #state{module = Mod,
module_state = MState,
@@ -220,7 +224,7 @@ terminate(Reason, #state{module = Mod, module_state = MState}) ->
Mod:terminate(Reason, MState).
code_change(_OldVsn, State, _Extra) ->
- State.
+ {ok, State}.
%%---------------------------------------------------------------------------
%% Infos
@@ -245,8 +249,8 @@ handle_command({open_channel, ProposedNumber, Consumer}, _From,
{reply, amqp_channels_manager:open_channel(ChMgr, ProposedNumber, Consumer,
Mod:open_channel_args(MState)),
State};
-handle_command({close, #'connection.close'{} = Close}, From, State) ->
- app_initiated_close(Close, From, State).
+handle_command({close, #'connection.close'{} = Close, Timeout}, From, State) ->
+ app_initiated_close(Close, From, Timeout, State).
%%---------------------------------------------------------------------------
%% Handling methods from broker
@@ -270,7 +274,11 @@ handle_method(Other, State) ->
%% Closing
%%---------------------------------------------------------------------------
-app_initiated_close(Close, From, State) ->
+app_initiated_close(Close, From, Timeout, State) ->
+ case Timeout of
+ infinity -> ok;
+ _ -> erlang:send_after(Timeout, self(), closing_timeout)
+ end,
set_closing_state(flush, #closing{reason = app_initiated_close,
close = Close,
from = From}, State).
View
5 src/amqp_gen_consumer.erl
@@ -241,12 +241,11 @@ handle_info(Info, State = #state{module_state = MState,
{ok, NewMState} ->
{noreply, State#state{module_state = NewMState}};
{error, Reason, NewMState} ->
- {stop, {error, Reason}, {error, Reason},
- State#state{module_state = NewMState}}
+ {stop, {error, Reason}, State#state{module_state = NewMState}}
end.
terminate(Reason, #state{module = ConsumerModule, module_state = MState}) ->
ConsumerModule:terminate(Reason, MState).
code_change(_OldVsn, State, _Extra) ->
- State.
+ {ok, State}.
View
4 src/amqp_main_reader.erl
@@ -17,7 +17,7 @@
%% @private
-module(amqp_main_reader).
--include("amqp_client.hrl").
+-include("amqp_client_internal.hrl").
-behaviour(gen_server).
@@ -54,7 +54,7 @@ terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
- State.
+ {ok, State}.
handle_call(Call, From, State) ->
{stop, {unexpected_call, Call, From}, State}.
View
35 src/amqp_network_connection.erl
@@ -17,10 +17,9 @@
%% @private
-module(amqp_network_connection).
--include("amqp_client.hrl").
+-include("amqp_client_internal.hrl").
-behaviour(amqp_gen_connection).
-
-export([init/1, terminate/2, connect/4, do/2, open_channel_args/1, i/2,
info_keys/0, handle_message/2, closing/3, channels_terminated/1]).
@@ -68,7 +67,19 @@ handle_message({socket_error, _} = SocketError, State) ->
handle_message({channel_exit, Reason}, State) ->
{stop, {channel0_died, Reason}, State};
handle_message(heartbeat_timeout, State) ->
- {stop, heartbeat_timeout, State}.
+ {stop, heartbeat_timeout, State};
+handle_message(closing_timeout, State = #state{closing_reason = Reason}) ->
+ {stop, Reason, State};
+%% see http://erlang.org/pipermail/erlang-bugs/2012-June/002933.html
+handle_message({Ref, {error, Reason}},
+ State = #state{waiting_socket_close = Waiting,
+ closing_reason = CloseReason})
+ when is_reference(Ref) ->
+ {stop, case {Reason, Waiting} of
+ {closed, true} -> {shutdown, CloseReason};
+ {closed, false} -> socket_closed_unexpectedly;
+ {_, _} -> {socket_error, Reason}
+ end, State}.
closing(_ChannelCloseType, Reason, State) ->
{ok, State#state{closing_reason = Reason}}.
@@ -107,6 +118,7 @@ do_connect({Addr, Family},
connection_timeout = Timeout,
socket_options = ExtraOpts},
SIF, ChMgr, State) ->
+ obtain(),
case gen_tcp:connect(Addr, Port,
[Family | ?RABBIT_TCP_OPTS] ++ ExtraOpts,
Timeout) of
@@ -120,7 +132,8 @@ do_connect({Addr, Family},
connection_timeout = Timeout,
socket_options = ExtraOpts},
SIF, ChMgr, State) ->
- rabbit_misc:start_applications([crypto, public_key, ssl]),
+ app_utils:start_applications([crypto, public_key, ssl]),
+ obtain(),
case gen_tcp:connect(Addr, Port,
[Family | ?RABBIT_TCP_OPTS] ++ ExtraOpts,
Timeout) of
@@ -137,9 +150,15 @@ do_connect({Addr, Family},
E
end.
+inet_address_preference() ->
+ case application:get_env(amqp_client, prefer_ipv6) of
+ {ok, true} -> [inet6, inet];
+ {ok, false} -> [inet, inet6]
+ end.
+
gethostaddr(Host) ->
Lookups = [{Family, inet:getaddr(Host, Family)}
- || Family <- [inet, inet6]],
+ || Family <- inet_address_preference()],
[{IP, Family} || {Family, {ok, IP}} <- Lookups].
try_handshake(AmqpParams, SIF, ChMgr, State) ->
@@ -296,3 +315,9 @@ handshake_recv(Expecting) ->
exit(handshake_receive_timed_out)
end
end.
+
+obtain() ->
+ case code:is_loaded(file_handle_cache) of
+ false -> ok;
+ _ -> file_handle_cache:obtain()
+ end.
View
2  src/amqp_rpc_client.erl
@@ -168,4 +168,4 @@ handle_info({#'basic.deliver'{delivery_tag = DeliveryTag},
%% @private
code_change(_OldVsn, State, _Extra) ->
- State.
+ {ok, State}.
View
2  src/amqp_rpc_server.erl
@@ -130,4 +130,4 @@ terminate(_Reason, #state{channel = Channel}) ->
%% @private
code_change(_OldVsn, State, _Extra) ->
- State.
+ {ok, State}.
View
46 src/amqp_uri.erl
@@ -18,7 +18,7 @@
-include("amqp_client.hrl").
--export([parse/1]).
+-export([parse/1, parse/2]).
%%---------------------------------------------------------------------------
%% AMQP URI Parsing
@@ -41,8 +41,10 @@
%% frame_max, and heartbeat. The extra parameters that may be
%% specified for an SSL connection are cacertfile, certfile, keyfile,
%% verify, and fail_if_no_peer_cert.
-parse(Uri) ->
- try case parse1(Uri) of
+parse(Uri) -> parse(Uri, <<"/">>).
+
+parse(Uri, DefaultVHost) ->
+ try case parse1(Uri, DefaultVHost) of
{ok, #amqp_params_network{host = undefined,
username = User,
virtual_host = Vhost}} ->
@@ -55,7 +57,7 @@ parse(Uri) ->
error:Err -> {error, {Err, Uri}}
end.
-parse1(Uri) when is_list(Uri) ->
+parse1(Uri, DefaultVHost) when is_list(Uri) ->
case uri_parser:parse(Uri, [{host, undefined}, {path, undefined},
{port, undefined}, {'query', []}]) of
{error, Err} ->
@@ -63,13 +65,13 @@ parse1(Uri) when is_list(Uri) ->
Parsed ->
Endpoint =
case string:to_lower(proplists:get_value(scheme, Parsed)) of
- "amqp" -> build_broker(Parsed);
- "amqps" -> build_ssl_broker(Parsed);
+ "amqp" -> build_broker(Parsed, DefaultVHost);
+ "amqps" -> build_ssl_broker(Parsed, DefaultVHost);
Scheme -> fail({unexpected_uri_scheme, Scheme})
end,
return({ok, broker_add_query(Endpoint, Parsed)})
end;
-parse1(_) ->
+parse1(_, _DefaultVHost) ->
fail(expected_string_uri).
unescape_string(Atom) when is_atom(Atom) ->
@@ -87,7 +89,7 @@ unescape_string([$% | Rest]) ->
unescape_string([C | Rest]) ->
[C | unescape_string(Rest)].
-build_broker(ParsedUri) ->
+build_broker(ParsedUri, DefaultVHost) ->
[Host, Port, Path] =
[proplists:get_value(F, ParsedUri) || F <- [host, port, path]],
case Port =:= undefined orelse (0 < Port andalso Port =< 65535) of
@@ -95,16 +97,17 @@ build_broker(ParsedUri) ->
false -> fail({port_out_of_range, Port})
end,
VHost = case Path of
- undefined -> <<"/">>;
+ undefined -> DefaultVHost;
[$/|Rest] -> case string:chr(Rest, $/) of
0 -> list_to_binary(unescape_string(Rest));
_ -> fail({invalid_vhost, Rest})
end
end,
UserInfo = proplists:get_value(userinfo, ParsedUri),
- Ps = #amqp_params_network{host = unescape_string(Host),
- port = Port,
- virtual_host = VHost},
+ Ps = #amqp_params_network{host = unescape_string(Host),
+ port = Port,
+ virtual_host = VHost,
+ auth_mechanisms = mechanisms(ParsedUri)},
case UserInfo of
[U, P | _] -> Ps#amqp_params_network{
username = list_to_binary(unescape_string(U)),
@@ -114,8 +117,8 @@ build_broker(ParsedUri) ->
_ -> Ps
end.
-build_ssl_broker(ParsedUri) ->
- Params = build_broker(ParsedUri),
+build_ssl_broker(ParsedUri, DefaultVHost) ->
+ Params = build_broker(ParsedUri, DefaultVHost),
Query = proplists:get_value('query', ParsedUri),
SSLOptions =
run_state_monad(
@@ -185,8 +188,19 @@ find_boolean_parameter(Value) ->
false -> fail({require_boolean, Bool})
end.
-find_atom_parameter(Value) ->
- return(list_to_atom(Value)).
+find_atom_parameter(Value) -> return(list_to_atom(Value)).
+
+mechanisms(ParsedUri) ->
+ Query = proplists:get_value('query', ParsedUri),
+ Mechanisms = case proplists:get_all_values("auth_mechanism", Query) of
+ [] -> ["plain", "amqplain"];
+ Mechs -> Mechs
+ end,
+ [case [list_to_atom(T) || T <- string:tokens(Mech, ":")] of
+ [F] -> fun (R, P, S) -> amqp_auth_mechanisms:F(R, P, S) end;
+ [M, F] -> fun (R, P, S) -> M:F(R, P, S) end;
+ L -> throw({not_mechanism, L})
+ end || Mech <- Mechanisms].
%% --=: Plain state monad implementation start :=--
run_state_monad(FunList, State) ->
Please sign in to comment.
Something went wrong with that request. Please try again.