Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Updated to RabbitMQ 2.7.0

  • Loading branch information...
commit a29de89b979315b53c7507e4d6e0ecdfd6896891 1 parent cb4ffed
@jbrisbin authored
View
2  README.md
@@ -5,7 +5,7 @@ This is a fork of the [official RabbitMQ/AMQP Erlang client](https://github.com/
It's meant to be included in your rebar projects in your rebar.config file:
{deps, [
- {amqp_client, ".*", {git, "git://github.com/jbrisbin/amqp_client.git", {tag, "rabbitmq_2.6.1"}}}
+ {amqp_client, ".*", {git, "git://github.com/jbrisbin/amqp_client.git", {tag, "rabbitmq_2.7.0"}}}
]}.
This is simply a re-packaging of the AMQP client, which is licensed under the MPL:
View
24 include/amqp_client.hrl
@@ -32,19 +32,21 @@
-record(amqp_msg, {props = #'P_basic'{}, payload = <<>>}).
--record(amqp_params_network, {username = <<"guest">>,
- password = <<"guest">>,
- virtual_host = <<"/">>,
- host = "localhost",
- port = undefined,
- channel_max = 0,
- frame_max = 0,
- heartbeat = 0,
- ssl_options = none,
- auth_mechanisms =
+-record(amqp_params_network, {username = <<"guest">>,
+ password = <<"guest">>,
+ virtual_host = <<"/">>,
+ host = "localhost",
+ port = undefined,
+ channel_max = 0,
+ frame_max = 0,
+ heartbeat = 0,
+ connection_timeout = infinity,
+ ssl_options = none,
+ auth_mechanisms =
[fun amqp_auth_mechanisms:plain/3,
fun amqp_auth_mechanisms:amqplain/3],
- client_properties = []}).
+ client_properties = [],
+ socket_options = []}).
-record(amqp_params_direct, {username = <<"guest">>,
virtual_host = <<"/">>,
View
2  rebar.config
@@ -1,3 +1,3 @@
{deps, [
- {rabbit_common, ".*", {git, "git://github.com/jbrisbin/rabbit_common.git", {tag, "rabbitmq_2.6.1"}}}
+ {rabbit_common, ".*", {git, "git://github.com/jbrisbin/rabbit_common.git", {tag, "rabbitmq_2.7.0"}}}
]}.
View
32 src/amqp_channel.erl
@@ -79,7 +79,6 @@
handle_info/2]).
-define(TIMEOUT_FLUSH, 60000).
--define(TIMEOUT_CLOSE_OK, 3000).
-record(state, {number,
connection,
@@ -393,11 +392,6 @@ handle_info(timed_out_flushing_channel, State) ->
"connection closing~n", [self()]),
{stop, timed_out_flushing_channel, State};
%% @private
-handle_info(timed_out_waiting_close_ok, State) ->
- ?LOG_WARN("Channel (~p) closing: timed out waiting for "
- "channel.close_ok while connection closing~n", [self()]),
- {stop, timed_out_waiting_close_ok, State};
-%% @private
handle_info({'DOWN', _, process, ReturnHandler, Reason},
State = #state{return_handler_pid = ReturnHandler}) ->
?LOG_WARN("Channel (~p): Unregistering return handler ~p because it died. "
@@ -569,7 +563,6 @@ handle_method_from_server1(#'channel.close'{reply_code = Code,
%% Both client and server sent close at the same time. Don't shutdown yet,
%% wait for close_ok.
do(#'channel.close_ok'{}, none, State),
- erlang:send_after(?TIMEOUT_CLOSE_OK, self(), timed_out_waiting_close_ok),
{noreply,
State#state{
closing = {just_channel, {server_initiated_close, Code, Text}}}};
@@ -662,8 +655,6 @@ handle_connection_closing(CloseType, Reason,
timed_out_flushing_channel),
{noreply, NewState};
{flush, {just_channel, _}, false} ->
- erlang:send_after(?TIMEOUT_CLOSE_OK, self(),
- timed_out_waiting_close_ok),
{noreply, NewState};
_ ->
handle_shutdown({connection_closing, Reason}, NewState)
@@ -774,16 +765,31 @@ handle_nack(State = #state{waiting_set = WSet}) ->
close(self(), 200, <<"Nacks Received">>)
end.
-update_confirm_set(#'basic.ack'{delivery_tag = SeqNo},
+update_confirm_set(#'basic.ack'{delivery_tag = SeqNo,
+ multiple = Multiple},
State = #state{unconfirmed_set = USet}) ->
maybe_notify_waiters(
- State#state{unconfirmed_set = gb_sets:del_element(SeqNo, USet)});
-update_confirm_set(#'basic.nack'{delivery_tag = SeqNo},
+ State#state{unconfirmed_set =
+ update_unconfirmed(SeqNo, Multiple, USet)});
+update_confirm_set(#'basic.nack'{delivery_tag = SeqNo,
+ multiple = Multiple},
State = #state{unconfirmed_set = USet}) ->
maybe_notify_waiters(
- State#state{unconfirmed_set = gb_sets:del_element(SeqNo, USet),
+ State#state{unconfirmed_set = update_unconfirmed(SeqNo, Multiple, USet),
only_acks_received = false}).
+update_unconfirmed(SeqNo, false, USet) ->
+ gb_sets:del_element(SeqNo, USet);
+update_unconfirmed(SeqNo, true, USet) ->
+ case gb_sets:is_empty(USet) of
+ true -> USet;
+ false -> {S, USet1} = gb_sets:take_smallest(USet),
+ case S > SeqNo of
+ true -> USet;
+ false -> update_unconfirmed(SeqNo, true, USet1)
+ end
+ end.
+
maybe_notify_waiters(State = #state{unconfirmed_set = USet}) ->
case gb_sets:is_empty(USet) of
false -> State;
View
2  src/amqp_client.app.src
@@ -1,6 +1,6 @@
{application, amqp_client,
[{description, "RabbitMQ Erlang Client Library"},
- {vsn, "2.6.1"},
+ {vsn, "2.7.0"},
{modules, []},
{registered, []},
{env, []},
View
8 src/amqp_connection.erl
@@ -112,10 +112,18 @@
%% defaults to 0 (network only)</li>
%% <li>heartbeat :: non_neg_integer() - The hearbeat interval in seconds,
%% defaults to 0 (turned off) (network only)</li>
+%% <li>connection_timeout :: non_neg_integer() | 'infinity'
+%% - The connection timeout in milliseconds,
+%% defaults to 'infinity' (network only)</li>
%% <li>ssl_options :: term() - The second parameter to be used with the
%% ssl:connect/2 function, defaults to 'none' (network only)</li>
%% <li>client_properties :: [{binary(), atom(), binary()}] - A list of extra
%% client properties to be sent to the server, defaults to []</li>
+%% <li>socket_options :: [any()] - Extra socket options. These are
+%% appended to the default options. See
+%% <a href="http://www.erlang.org/doc/man/inet.html#setopts-2">inet:setopts/2</a>
+%% and <a href="http://www.erlang.org/doc/man/gen_tcp.html#connect-4">
+%% gen_tcp:connect/4</a> for descriptions of the available options.</li>
%% </ul>
View
8 src/amqp_gen_connection.erl
@@ -28,8 +28,6 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
--define(CLIENT_CLOSE_TIMEOUT, 60000).
-
-define(INFO_KEYS, [server_properties, is_closing, amqp_params, num_channels,
channel_max]).
@@ -242,7 +240,7 @@ handle_command({open_channel, ProposedNumber, Consumer}, _From,
{reply, amqp_channels_manager:open_channel(ChMgr, ProposedNumber, Consumer,
Mod:open_channel_args(MState)),
State};
- handle_command({close, #'connection.close'{} = Close}, From, State) ->
+handle_command({close, #'connection.close'{} = Close}, From, State) ->
app_initiated_close(Close, From, State).
%%---------------------------------------------------------------------------
@@ -331,9 +329,7 @@ handle_channels_terminated(State = #state{closing = Closing,
server_initiated_close ->
Mod:do(#'connection.close_ok'{}, MState);
_ ->
- Mod:do(Close, MState),
- erlang:send_after(?CLIENT_CLOSE_TIMEOUT, self(),
- {'$gen_cast', timeout_waiting_for_close_ok})
+ Mod:do(Close, MState)
end,
case callback(channels_terminated, [], State) of
{stop, _, _} = Stop -> case From of none -> ok;
View
23 src/amqp_network_connection.erl
@@ -55,9 +55,6 @@ do2(Method, #state{writer0 = Writer}) ->
%% Catching because it expects the {channel_exit, _} message on error
catch rabbit_writer:send_command_sync(Writer, Method).
-handle_message(timeout_waiting_for_close_ok,
- State = #state{closing_reason = Reason}) ->
- {stop, {timeout_waiting_for_close_ok, Reason}, State};
handle_message(socket_closing_timeout,
State = #state{closing_reason = Reason}) ->
{stop, {socket_closing_timeout, Reason}, State};
@@ -105,20 +102,28 @@ connect(AmqpParams = #amqp_params_network{host = Host}, SIF, ChMgr, State) ->
end.
do_connect({Addr, Family},
- AmqpParams = #amqp_params_network{ssl_options = none,
- port = Port},
+ AmqpParams = #amqp_params_network{ssl_options = none,
+ port = Port,
+ connection_timeout = Timeout,
+ socket_options = ExtraOpts},
SIF, ChMgr, State) ->
- case gen_tcp:connect(Addr, Port, [Family | ?RABBIT_TCP_OPTS]) of
+ case gen_tcp:connect(Addr, Port,
+ [Family | ?RABBIT_TCP_OPTS] ++ ExtraOpts,
+ Timeout) of
{ok, Sock} -> try_handshake(AmqpParams, SIF, ChMgr,
State#state{sock = Sock});
{error, _} = E -> E
end;
do_connect({Addr, Family},
- AmqpParams = #amqp_params_network{ssl_options = SslOpts,
- port = Port},
+ AmqpParams = #amqp_params_network{ssl_options = SslOpts,
+ port = Port,
+ connection_timeout = Timeout,
+ socket_options = ExtraOpts},
SIF, ChMgr, State) ->
rabbit_misc:start_applications([crypto, public_key, ssl]),
- case gen_tcp:connect(Addr, Port, [Family | ?RABBIT_TCP_OPTS]) of
+ case gen_tcp:connect(Addr, Port,
+ [Family | ?RABBIT_TCP_OPTS] ++ ExtraOpts,
+ Timeout) of
{ok, Sock} ->
case ssl:connect(Sock, SslOpts) of
{ok, SslSock} ->
View
198 src/amqp_uri.erl
@@ -0,0 +1,198 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(amqp_uri).
+
+-include("amqp_client.hrl").
+
+-export([parse/1]).
+
+%%---------------------------------------------------------------------------
+%% AMQP URI Parsing
+%%---------------------------------------------------------------------------
+
+%% @spec (Uri) -> {ok, #amqp_params_network{} | #amqp_params_direct{}} |
+%% {error, {Info, Uri}}
+%% where
+%% Uri = string()
+%% Info = any()
+%%
+%% @doc Parses an AMQP URI. If any of the URI parts are missing, the
+%% default values are used. If the hostname is zero-length, an
+%% #amqp_params_direct{} record is returned; otherwise, an
+%% #amqp_params_network{} record is returned. Extra parameters may be
+%% specified via the query string (e.g. "?heartbeat=5"). In case of
+%% failure, an {error, {Info, Uri}} tuple is returned.
+%%
+%% The extra parameters that may be specified are channel_max,
+%% frame_max, and heartbeat. The extra parameters that may be
+%% specified for an SSL connection are cacertfile, certfile, keyfile,
+%% verify, and fail_if_no_peer_cert.
+parse(Uri) ->
+ try case parse1(Uri) of
+ {ok, #amqp_params_network{host = undefined,
+ username = User,
+ virtual_host = Vhost}} ->
+ return({ok, #amqp_params_direct{username = User,
+ virtual_host = Vhost}});
+ {ok, Params} ->
+ return({ok, Params})
+ end
+ catch throw:Err -> {error, {Err, Uri}};
+ error:Err -> {error, {Err, Uri}}
+ end.
+
+parse1(Uri) when is_list(Uri) ->
+ case uri_parser:parse(Uri, [{host, undefined}, {path, undefined},
+ {port, undefined}, {'query', []}]) of
+ {error, Err} ->
+ throw({unable_to_parse_uri, Err});
+ Parsed ->
+ Endpoint =
+ case string:to_lower(proplists:get_value(scheme, Parsed)) of
+ "amqp" -> build_broker(Parsed);
+ "amqps" -> build_ssl_broker(Parsed);
+ Scheme -> fail({unexpected_uri_scheme, Scheme})
+ end,
+ return({ok, broker_add_query(Endpoint, Parsed)})
+ end;
+parse1(_) ->
+ fail(expected_string_uri).
+
+unescape_string(Atom) when is_atom(Atom) ->
+ Atom;
+unescape_string([]) ->
+ [];
+unescape_string([$%, N1, N2 | Rest]) ->
+ try
+ [erlang:list_to_integer([N1, N2], 16) | unescape_string(Rest)]
+ catch
+ error:badarg -> throw({invalid_entitiy, ['%', N1, N2]})
+ end;
+unescape_string([$% | Rest]) ->
+ fail({unterminated_entity, ['%' | Rest]});
+unescape_string([C | Rest]) ->
+ [C | unescape_string(Rest)].
+
+build_broker(ParsedUri) ->
+ [Host, Port, Path] =
+ [proplists:get_value(F, ParsedUri) || F <- [host, port, path]],
+ case Port =:= undefined orelse (0 < Port andalso Port =< 65535) of
+ true -> ok;
+ false -> fail({port_out_of_range, Port})
+ end,
+ VHost = case Path of
+ undefined -> <<"/">>;
+ [$/|Rest] -> case string:chr(Rest, $/) of
+ 0 -> list_to_binary(unescape_string(Rest));
+ _ -> fail({invalid_vhost, Rest})
+ end
+ end,
+ UserInfo = proplists:get_value(userinfo, ParsedUri),
+ Ps = #amqp_params_network{host = unescape_string(Host),
+ port = Port,
+ virtual_host = VHost},
+ case UserInfo of
+ [U, P | _] -> Ps#amqp_params_network{
+ username = list_to_binary(unescape_string(U)),
+ password = list_to_binary(unescape_string(P))};
+ [U | _] -> Ps#amqp_params_network{
+ username = list_to_binary(unescape_string(U))};
+ _ -> Ps
+ end.
+
+build_ssl_broker(ParsedUri) ->
+ Params = build_broker(ParsedUri),
+ Query = proplists:get_value('query', ParsedUri),
+ SSLOptions =
+ run_state_monad(
+ [fun (L) -> KeyString = atom_to_list(Key),
+ case lists:keysearch(KeyString, 1, Query) of
+ {value, {_, Value}} ->
+ try return([{Key, unescape_string(Fun(Value))} | L])
+ catch throw:Reason ->
+ fail({invalid_ssl_parameter,
+ Key, Value, Query, Reason})
+ end;
+ false ->
+ L
+ end
+ end || {Fun, Key} <-
+ [{fun find_path_parameter/1, cacertfile},
+ {fun find_path_parameter/1, certfile},
+ {fun find_path_parameter/1, keyfile},
+ {fun find_atom_parameter/1, verify},
+ {fun find_boolean_parameter/1, fail_if_no_peer_cert}]],
+ []),
+ Params#amqp_params_network{ssl_options = SSLOptions}.
+
+broker_add_query(Params = #amqp_params_network{}, Uri) ->
+ broker_add_query(Params, Uri, record_info(fields, amqp_params_network)).
+
+broker_add_query(Params, ParsedUri, Fields) ->
+ Query = proplists:get_value('query', ParsedUri),
+ {Params1, _Pos} =
+ run_state_monad(
+ [fun ({ParamsN, Pos}) ->
+ Pos1 = Pos + 1,
+ KeyString = atom_to_list(Field),
+ case proplists:get_value(KeyString, Query) of
+ undefined ->
+ return({ParamsN, Pos1});
+ true -> %% proplists short form, not permitted
+ return({ParamsN, Pos1});
+ Value ->
+ try
+ ValueParsed = parse_amqp_param(Field, Value),
+ return(
+ {setelement(Pos, ParamsN, ValueParsed), Pos1})
+ catch throw:Reason ->
+ fail({invalid_amqp_params_parameter,
+ Field, Value, Query, Reason})
+ end
+ end
+ end || Field <- Fields], {Params, 2}),
+ Params1.
+
+parse_amqp_param(Field, String) when Field =:= channel_max orelse
+ Field =:= frame_max orelse
+ Field =:= heartbeat ->
+ try return(list_to_integer(String))
+ catch error:badarg -> fail({not_an_integer, String})
+ end;
+parse_amqp_param(Field, String) ->
+ fail({parameter_unconfigurable_in_query, Field, String}).
+
+find_path_parameter(Value) -> return(Value).
+
+find_boolean_parameter(Value) ->
+ Bool = list_to_atom(Value),
+ case is_boolean(Bool) of
+ true -> return(Bool);
+ false -> fail({require_boolean, Bool})
+ end.
+
+find_atom_parameter(Value) ->
+ return(list_to_atom(Value)).
+
+%% --=: Plain state monad implementation start :=--
+run_state_monad(FunList, State) ->
+ lists:foldl(fun (Fun, StateN) -> Fun(StateN) end, State, FunList).
+
+return(V) -> V.
+
+fail(Reason) -> throw(Reason).
+%% --=: end :=--
View
121 src/uri_parser.erl
@@ -0,0 +1,121 @@
+%% This file is a copy of http_uri.erl from the R13B-1 Erlang/OTP
+%% distribution with several modifications.
+
+%% All modifications are Copyright (c) 2009-2011 VMware, Ltd.
+
+%% ``The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved via the world wide web at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
+%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
+%% AB. All Rights Reserved.''
+
+%% See http://tools.ietf.org/html/rfc3986
+
+-module(uri_parser).
+
+-export([parse/2]).
+
+%%%=========================================================================
+%%% API
+%%%=========================================================================
+
+%% Returns a key list of elements extracted from the URI. Note that
+%% only 'scheme' is guaranteed to exist. Key-Value pairs from the
+%% Defaults list will be used absence of a non-empty value extracted
+%% from the URI. The values extracted are strings, except for 'port'
+%% which is an integer, 'userinfo' which is a list of strings (split
+%% on $:), and 'query' which is a list of strings where no $= char
+%% found, or a {key,value} pair where a $= char is found (initial
+%% split on $& and subsequent optional split on $=). Possible keys
+%% are: 'scheme', 'userinfo', 'host', 'port', 'path', 'query',
+%% 'fragment'.
+
+parse(AbsURI, Defaults) ->
+ case parse_scheme(AbsURI) of
+ {error, Reason} ->
+ {error, Reason};
+ {Scheme, Rest} ->
+ case (catch parse_uri_rest(Rest, true)) of
+ [_|_] = List ->
+ merge_keylists([{scheme, Scheme} | List], Defaults);
+ E ->
+ {error, {malformed_uri, AbsURI, E}}
+ end
+ end.
+
+%%%========================================================================
+%%% Internal functions
+%%%========================================================================
+parse_scheme(AbsURI) ->
+ split_uri(AbsURI, ":", {error, no_scheme}).
+
+parse_uri_rest("//" ++ URIPart, true) ->
+ %% we have an authority
+ {Authority, PathQueryFrag} =
+ split_uri(URIPart, "/|\\?|#", {URIPart, ""}, 1, 0),
+ AuthorityParts = parse_authority(Authority),
+ parse_uri_rest(PathQueryFrag, false) ++ AuthorityParts;
+parse_uri_rest(PathQueryFrag, _Bool) ->
+ %% no authority, just a path and maybe query
+ {PathQuery, Frag} = split_uri(PathQueryFrag, "#", {PathQueryFrag, ""}),
+ {Path, QueryString} = split_uri(PathQuery, "\\?", {PathQuery, ""}),
+ QueryPropList = split_query(QueryString),
+ [{path, Path}, {'query', QueryPropList}, {fragment, Frag}].
+
+parse_authority(Authority) ->
+ {UserInfo, HostPort} = split_uri(Authority, "@", {"", Authority}),
+ UserInfoSplit = case re:split(UserInfo, ":", [{return, list}]) of
+ [""] -> [];
+ UIS -> UIS
+ end,
+ [{userinfo, UserInfoSplit} | parse_host_port(HostPort)].
+
+parse_host_port("[" ++ HostPort) -> %ipv6
+ {Host, ColonPort} = split_uri(HostPort, "\\]", {HostPort, ""}),
+ [{host, Host} | case split_uri(ColonPort, ":", not_found, 0, 1) of
+ not_found -> case ColonPort of
+ [] -> [];
+ _ -> throw({invalid_port, ColonPort})
+ end;
+ {_, Port} -> [{port, list_to_integer(Port)}]
+ end];
+
+parse_host_port(HostPort) ->
+ {Host, Port} = split_uri(HostPort, ":", {HostPort, not_found}),
+ [{host, Host} | case Port of
+ not_found -> [];
+ _ -> [{port, list_to_integer(Port)}]
+ end].
+
+split_query(Query) ->
+ case re:split(Query, "&", [{return, list}]) of
+ [""] -> [];
+ QParams -> [split_uri(Param, "=", Param) || Param <- QParams]
+ end.
+
+split_uri(UriPart, SplitChar, NoMatchResult) ->
+ split_uri(UriPart, SplitChar, NoMatchResult, 1, 1).
+
+split_uri(UriPart, SplitChar, NoMatchResult, SkipLeft, SkipRight) ->
+ case re:run(UriPart, SplitChar) of
+ {match, [{Match, _}]} ->
+ {string:substr(UriPart, 1, Match + 1 - SkipLeft),
+ string:substr(UriPart, Match + 1 + SkipRight, length(UriPart))};
+ nomatch ->
+ NoMatchResult
+ end.
+
+merge_keylists(A, B) ->
+ {AEmpty, ANonEmpty} = lists:partition(fun ({_Key, V}) -> V =:= [] end, A),
+ [AEmptyS, ANonEmptyS, BS] =
+ [lists:ukeysort(1, X) || X <- [AEmpty, ANonEmpty, B]],
+ lists:ukeymerge(1, lists:ukeymerge(1, ANonEmptyS, BS), AEmptyS).
Please sign in to comment.
Something went wrong with that request. Please try again.