Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #2 from djui/master

Update to v2.6.1
  • Loading branch information...
commit 77917297c99da59f2b170a352efc80dd5389d13f 2 parents c4f42f7 + 166d608
@jbrisbin authored
View
2  README.md
@@ -5,7 +5,7 @@ This is a fork of the [official RabbitMQ/AMQP Erlang client](https://github.com/
It's meant to be included in your rebar projects in your rebar.config file:
{deps, [
- {amqp_client, ".*", {git, "git://github.com/jbrisbin/amqp_client.git", {tag, "rabbitmq_2.6.0"}}}
+ {amqp_client, ".*", {git, "git://github.com/jbrisbin/amqp_client.git", {tag, "rabbitmq_2.6.1"}}}
]}.
This is simply a re-packaging of the AMQP client, which is licensed under the MPL:
View
4 rebar.config
@@ -1,3 +1,3 @@
{deps, [
- {rabbit_common, ".*", {git, "git://github.com/jbrisbin/rabbit_common.git", {tag, "rabbitmq_2.6.0"}}}
-]}.
+ {rabbit_common, ".*", {git, "git://github.com/jbrisbin/rabbit_common.git", {tag, "rabbitmq_2.6.1"}}}
+]}.
View
3  src/amqp_client.app.src
@@ -1,7 +1,8 @@
{application, amqp_client,
[{description, "RabbitMQ Erlang Client Library"},
- {vsn, "2.6.0"},
+ {vsn, "2.6.1"},
{modules, []},
{registered, []},
{env, []},
+ {mod, {amqp_client, []}},
{applications, [kernel, stdlib]}]}.
View
99 src/amqp_direct_consumer.erl
@@ -0,0 +1,99 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
+%%
+
+%% @doc This module is an implementation of the amqp_gen_consumer
+%% behaviour and can be used as part of the Consumer parameter when
+%% opening AMQP channels.
+%% <br/>
+%% <br/>
+%% The Consumer parameter for this implementation is {{@module},
+%% [ConsumerPid]@}, where ConsumerPid is a process that will receive
+%% queue subscription-related messages.<br/>
+%% <br/>
+%% This consumer implementation causes the channel to send to the
+%% ConsumerPid all basic.consume, basic.consume_ok, basic.cancel,
+%% basic.cancel_ok and basic.deliver messages received from the
+%% server.
+%% <br/>
+%% <br/>
+%% In addition, this consumer implementation monitors the ConsumerPid
+%% and exits with the same shutdown reason when it dies. 'DOWN'
+%% messages from other sources are passed to ConsumerPid.
+%% <br/>
+%% Warning! It is not recommended to rely on a consumer on killing off the
+%% channel (through the exit signal). That may cause messages to get lost.
+%% Always use amqp_channel:close/{1,3} for a clean shut down.<br/>
+%% <br/>
+%% This module has no public functions.
+-module(amqp_direct_consumer).
+
+-include("amqp_gen_consumer_spec.hrl").
+
+-behaviour(amqp_gen_consumer).
+
+-export([init/1, handle_consume_ok/3, handle_consume/3, handle_cancel_ok/3,
+ handle_cancel/2, handle_deliver/3, handle_info/2, handle_call/3,
+ terminate/2]).
+
+%%---------------------------------------------------------------------------
+%% amqp_gen_consumer callbacks
+%%---------------------------------------------------------------------------
+
+%% @private
+init([ConsumerPid]) ->
+ erlang:monitor(process, ConsumerPid),
+ {ok, ConsumerPid}.
+
+%% @private
+handle_consume(M, A, C) ->
+ C ! {M, A},
+ {ok, C}.
+
+%% @private
+handle_consume_ok(M, _, C) ->
+ C ! M,
+ {ok, C}.
+
+%% @private
+handle_cancel(M, C) ->
+ C ! M,
+ {ok, C}.
+
+%% @private
+handle_cancel_ok(M, _, C) ->
+ C ! M,
+ {ok, C}.
+
+%% @private
+handle_deliver(M, A, C) ->
+ C ! {M, A},
+ {ok, C}.
+
+%% @private
+handle_info({'DOWN', _MRef, process, C, Info}, C) ->
+ {error, {consumer_died, Info}, C};
+handle_info({'DOWN', MRef, process, Pid, Info}, C) ->
+ C ! {'DOWN', MRef, process, Pid, Info},
+ {ok, C}.
+
+%% @private
+handle_call(M, A, C) ->
+ C ! {M, A},
+ {reply, ok, C}.
+
+%% @private
+terminate(_Reason, C) ->
+ C.
View
252 src/amqp_gen_consumer.erl
@@ -0,0 +1,252 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
+%%
+
+%% @doc A behaviour module for implementing consumers for
+%% amqp_channel. To specify a consumer implementation for a channel,
+%% use amqp_connection:open_channel/{2,3}.
+%% <br/>
+%% All callbacks are called within the gen_consumer process. <br/>
+%% <br/>
+%% See comments in amqp_gen_consumer.erl source file for documentation
+%% on the callback functions.
+%% <br/>
+%% Note that making calls to the channel from the callback module will
+%% result in deadlock.
+-module(amqp_gen_consumer).
+
+-include("amqp_client.hrl").
+
+-behaviour(gen_server2).
+
+-export([start_link/2, call_consumer/2, call_consumer/3]).
+-export([behaviour_info/1]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2, prioritise_info/2]).
+
+-record(state, {module,
+ module_state}).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+%% @type ok_error() = {ok, state()} | {error, reason(), state()}.
+%% Denotes a successful or an error return from a consumer module call.
+
+start_link(ConsumerModule, ExtraParams) ->
+ gen_server2:start_link(?MODULE, [ConsumerModule, ExtraParams], []).
+
+%% @spec (Consumer, Msg) -> ok
+%% where
+%% Consumer = pid()
+%% Msg = any()
+%%
+%% @doc This function is used to perform arbitrary calls into the
+%% consumer module.
+call_consumer(Pid, Msg) ->
+ gen_server2:call(Pid, {consumer_call, Msg}, infinity).
+
+%% @spec (Consumer, Method, Args) -> ok
+%% where
+%% Consumer = pid()
+%% Method = amqp_method()
+%% Args = any()
+%%
+%% @doc This function is used by amqp_channel to forward received
+%% methods and deliveries to the consumer module.
+call_consumer(Pid, Method, Args) ->
+ gen_server2:call(Pid, {consumer_call, Method, Args}, infinity).
+
+%%---------------------------------------------------------------------------
+%% Behaviour
+%%---------------------------------------------------------------------------
+
+%% @private
+behaviour_info(callbacks) ->
+ [
+ %% init(Args) -> {ok, InitialState} | {stop, Reason} | ignore
+ %% where
+ %% Args = [any()]
+ %% InitialState = state()
+ %% Reason = term()
+ %%
+ %% This callback is invoked by the channel, when it starts
+ %% up. Use it to initialize the state of the consumer. In case of
+ %% an error, return {stop, Reason} or ignore.
+ {init, 1},
+
+ %% handle_consume(Consume, Sender, State) -> ok_error()
+ %% where
+ %% Consume = #'basic.consume'{}
+ %% Sender = pid()
+ %% State = state()
+ %%
+ %% This callback is invoked by the channel before a basic.consume
+ %% is sent to the server.
+ {handle_consume, 3},
+
+ %% handle_consume_ok(ConsumeOk, Consume, State) -> ok_error()
+ %% where
+ %% ConsumeOk = #'basic.consume_ok'{}
+ %% Consume = #'basic.consume'{}
+ %% State = state()
+ %%
+ %% This callback is invoked by the channel every time a
+ %% basic.consume_ok is received from the server. Consume is the original
+ %% method sent out to the server - it can be used to associate the
+ %% call with the response.
+ {handle_consume_ok, 3},
+
+ %% handle_cancel(Cancel, State) -> ok_error()
+ %% where
+ %% Cancel = #'basic.cancel'{}
+ %% State = state()
+ %%
+ %% This callback is invoked by the channel every time a basic.cancel
+ %% is received from the server.
+ {handle_cancel, 2},
+
+ %% handle_cancel_ok(CancelOk, Cancel, State) -> ok_error()
+ %% where
+ %% CancelOk = #'basic.cancel_ok'{}
+ %% Cancel = #'basic.cancel'{}
+ %% State = state()
+ %%
+ %% This callback is invoked by the channel every time a basic.cancel_ok
+ %% is received from the server.
+ {handle_cancel_ok, 3},
+
+ %% handle_deliver(Deliver, Message, State) -> ok_error()
+ %% where
+ %% Deliver = #'basic.deliver'{}
+ %% Message = #amqp_msg{}
+ %% State = state()
+ %%
+ %% This callback is invoked by the channel every time a basic.deliver
+ %% is received from the server.
+ {handle_deliver, 3},
+
+ %% handle_info(Info, State) -> ok_error()
+ %% where
+ %% Info = any()
+ %% State = state()
+ %%
+ %% This callback is invoked the consumer process receives a
+ %% message.
+ {handle_info, 2},
+
+ %% handle_call(Msg, From, State) -> {reply, Reply, NewState} |
+ %% {noreply, NewState} |
+ %% {error, Reason, NewState}
+ %% where
+ %% Msg = any()
+ %% From = any()
+ %% Reply = any()
+ %% State = state()
+ %% NewState = state()
+ %%
+ %% This callback is invoked by the channel when calling
+ %% amqp_channel:call_consumer/2. Reply is the term that
+ %% amqp_channel:call_consumer/2 will return. If the callback
+ %% returns {noreply, _}, then the caller to
+ %% amqp_channel:call_consumer/2 and the channel remain blocked
+ %% until gen_server2:reply/2 is used with the provided From as
+ %% the first argument.
+ {handle_call, 3},
+
+ %% terminate(Reason, State) -> any()
+ %% where
+ %% Reason = any()
+ %% State = state()
+ %%
+ %% This callback is invoked by the channel after it has shut down and
+ %% just before its process exits.
+ {terminate, 2}
+ ];
+behaviour_info(_Other) ->
+ undefined.
+
+%%---------------------------------------------------------------------------
+%% gen_server2 callbacks
+%%---------------------------------------------------------------------------
+
+init([ConsumerModule, ExtraParams]) ->
+ case ConsumerModule:init(ExtraParams) of
+ {ok, MState} ->
+ {ok, #state{module = ConsumerModule, module_state = MState}};
+ {stop, Reason} ->
+ {stop, Reason};
+ ignore ->
+ ignore
+ end.
+
+prioritise_info({'DOWN', _MRef, process, _Pid, _Info}, _State) -> 1;
+prioritise_info(_, _State) -> 0.
+
+handle_call({consumer_call, Msg}, From,
+ State = #state{module = ConsumerModule,
+ module_state = MState}) ->
+ case ConsumerModule:handle_call(Msg, From, MState) of
+ {noreply, NewMState} ->
+ {noreply, State#state{module_state = NewMState}};
+ {reply, Reply, NewMState} ->
+ {reply, Reply, State#state{module_state = NewMState}};
+ {error, Reason, NewMState} ->
+ {stop, {error, Reason}, {error, Reason},
+ State#state{module_state = NewMState}}
+ end;
+handle_call({consumer_call, Method, Args}, _From,
+ State = #state{module = ConsumerModule,
+ module_state = MState}) ->
+ Return =
+ case Method of
+ #'basic.consume'{} ->
+ ConsumerModule:handle_consume(Method, Args, MState);
+ #'basic.consume_ok'{} ->
+ ConsumerModule:handle_consume_ok(Method, Args, MState);
+ #'basic.cancel'{} ->
+ ConsumerModule:handle_cancel(Method, MState);
+ #'basic.cancel_ok'{} ->
+ ConsumerModule:handle_cancel_ok(Method, Args, MState);
+ #'basic.deliver'{} ->
+ ConsumerModule:handle_deliver(Method, Args, MState)
+ end,
+ case Return of
+ {ok, NewMState} ->
+ {reply, ok, State#state{module_state = NewMState}};
+ {error, Reason, NewMState} ->
+ {stop, {error, Reason}, {error, Reason},
+ State#state{module_state = NewMState}}
+ end.
+
+handle_cast(_What, State) ->
+ {noreply, State}.
+
+handle_info(Info, State = #state{module_state = MState,
+ module = ConsumerModule}) ->
+ case ConsumerModule:handle_info(Info, MState) of
+ {ok, NewMState} ->
+ {noreply, State#state{module_state = NewMState}};
+ {error, Reason, NewMState} ->
+ {stop, {error, Reason}, {error, Reason},
+ State#state{module_state = NewMState}}
+ end.
+
+terminate(Reason, #state{module = ConsumerModule, module_state = MState}) ->
+ ConsumerModule:terminate(Reason, MState).
+
+code_change(_OldVsn, State, _Extra) ->
+ State.
View
245 src/amqp_selective_consumer.erl
@@ -0,0 +1,245 @@
+%% The contents of this file are subject to the Mozilla Public Licensbe
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011-2011 VMware, Inc. All rights reserved.
+%%
+
+%% @doc This module is an implementation of the amqp_gen_consumer
+%% behaviour and can be used as part of the Consumer parameter when
+%% opening AMQP channels. This is the default implementation selected
+%% by channel. <br/>
+%% <br/>
+%% The Consumer parameter for this implementation is {{@module}, []@}<br/>
+%% This consumer implementation keeps track of consumer tags and sends
+%% the subscription-relevant messages to the registered consumers, according
+%% to an internal tag dictionary.<br/>
+%% <br/>
+%% Send a #basic.consume{} message to the channel to subscribe a
+%% consumer to a queue and send a #basic.cancel{} message to cancel a
+%% subscription.<br/>
+%% <br/>
+%% The channel will send to the relevant registered consumers the
+%% basic.consume_ok, basic.cancel_ok, basic.cancel and basic.deliver messages
+%% received from the server.<br/>
+%% <br/>
+%% If a consumer is not registered for a given consumer tag, the message
+%% is sent to the default consumer registered with
+%% {@module}:register_default_consumer. If there is no default consumer
+%% registered in this case, an exception occurs and the channel is abruptly
+%% terminated.<br/>
+-module(amqp_selective_consumer).
+
+-include("amqp_client.hrl").
+-include("amqp_gen_consumer_spec.hrl").
+
+-behaviour(amqp_gen_consumer).
+
+-export([register_default_consumer/2]).
+-export([init/1, handle_consume_ok/3, handle_consume/3, handle_cancel_ok/3,
+ handle_cancel/2, handle_deliver/3, handle_info/2, handle_call/3,
+ terminate/2]).
+
+-record(state, {consumers = dict:new(), %% Tag -> ConsumerPid
+ unassigned = undefined, %% Pid
+ monitors = dict:new(), %% Pid -> {Count, MRef}
+ default_consumer = none}).
+
+%%---------------------------------------------------------------------------
+%% Interface
+%%---------------------------------------------------------------------------
+
+%% @spec (ChannelPid, ConsumerPid) -> ok
+%% where
+%% ChannelPid = pid()
+%% ConsumerPid = pid()
+%% @doc This function registers a default consumer with the channel. A
+%% default consumer is used when a subscription is made via
+%% amqp_channel:call(ChannelPid, #'basic.consume'{}) (rather than
+%% {@module}:subscribe/3) and hence there is no consumer pid
+%% registered with the consumer tag. In this case, the relevant
+%% deliveries will be sent to the default consumer.
+register_default_consumer(ChannelPid, ConsumerPid) ->
+ amqp_channel:call_consumer(ChannelPid,
+ {register_default_consumer, ConsumerPid}).
+
+%%---------------------------------------------------------------------------
+%% amqp_gen_consumer callbacks
+%%---------------------------------------------------------------------------
+
+%% @private
+init([]) ->
+ {ok, #state{}}.
+
+%% @private
+handle_consume(BasicConsume, Pid, State = #state{consumers = Consumers,
+ monitors = Monitors}) ->
+ Tag = tag(BasicConsume),
+ Ok =
+ case BasicConsume of
+ #'basic.consume'{nowait = true}
+ when Tag =:= undefined orelse size(Tag) == 0 ->
+ false; %% Async and undefined tag
+ _ when is_binary(Tag) andalso size(Tag) >= 0 ->
+ case resolve_consumer(Tag, State) of
+ {consumer, _} -> false; %% Tag already in use
+ _ -> true
+ end;
+ _ ->
+ true
+ end,
+ case {Ok, BasicConsume} of
+ {true, #'basic.consume'{nowait = true}} ->
+ {ok, State#state
+ {consumers = dict:store(Tag, Pid, Consumers),
+ monitors = add_to_monitor_dict(Pid, Monitors)}};
+ {true, #'basic.consume'{nowait = false}} ->
+ {ok, State#state{unassigned = Pid}};
+ {false, #'basic.consume'{nowait = true}} ->
+ {error, 'no_consumer_tag_specified', State};
+ {false, #'basic.consume'{nowait = false}} ->
+ %% Don't do anything (don't override existing
+ %% consumers), the server will close the channel with an error.
+ {ok, State}
+ end.
+
+%% @private
+handle_consume_ok(BasicConsumeOk, _BasicConsume,
+ State = #state{unassigned = Pid,
+ consumers = Consumers,
+ monitors = Monitors})
+ when is_pid(Pid) ->
+ State1 =
+ State#state{
+ consumers = dict:store(tag(BasicConsumeOk), Pid, Consumers),
+ monitors = add_to_monitor_dict(Pid, Monitors),
+ unassigned = undefined},
+ deliver(BasicConsumeOk, State1),
+ {ok, State1}.
+
+%% @private
+%% The server sent a basic.cancel.
+handle_cancel(Cancel, State) ->
+ State1 = do_cancel(Cancel, State),
+ %% Use old state
+ deliver(Cancel, State),
+ {ok, State1}.
+
+%% @private
+%% We sent a basic.cancel and now receive the ok.
+handle_cancel_ok(CancelOk, _Cancel, State) ->
+ State1 = do_cancel(CancelOk, State),
+ %% Use old state
+ deliver(CancelOk, State),
+ {ok, State1}.
+
+%% @private
+handle_deliver(Deliver, Message, State) ->
+ deliver(Deliver, Message, State),
+ {ok, State}.
+
+%% @private
+handle_info({'DOWN', _MRef, process, Pid, _Info},
+ State = #state{monitors = Monitors,
+ consumers = Consumers,
+ default_consumer = DConsumer }) ->
+ case dict:find(Pid, Monitors) of
+ {ok, _CountMRef} ->
+ {ok, State#state{monitors = dict:erase(Pid, Monitors),
+ consumers =
+ dict:filter(
+ fun (_, Pid1) when Pid1 =:= Pid -> false;
+ (_, _) -> true
+ end, Consumers)}};
+ error ->
+ case Pid of
+ DConsumer -> {ok, State#state{
+ monitors = dict:erase(Pid, Monitors),
+ default_consumer = none}};
+ _ -> {ok, State} %% unnamed consumer went down
+ %% before receiving consume_ok
+ end
+ end.
+
+%% @private
+handle_call({register_default_consumer, Pid}, _From,
+ State = #state{default_consumer = PrevPid,
+ monitors = Monitors}) ->
+ Monitors1 = case PrevPid of
+ none -> Monitors;
+ _ -> remove_from_monitor_dict(PrevPid, Monitors)
+ end,
+ {reply, ok,
+ State#state{default_consumer = Pid,
+ monitors = add_to_monitor_dict(Pid, Monitors1)}}.
+
+%% @private
+terminate(_Reason, State) ->
+ State.
+
+%%---------------------------------------------------------------------------
+%% Internal plumbing
+%%---------------------------------------------------------------------------
+
+deliver(Msg, State) ->
+ deliver(Msg, undefined, State).
+deliver(Msg, Message, State) ->
+ Combined = if Message =:= undefined -> Msg;
+ true -> {Msg, Message}
+ end,
+ case resolve_consumer(tag(Msg), State) of
+ {consumer, Pid} -> Pid ! Combined;
+ {default, Pid} -> Pid ! Combined;
+ error -> exit(unexpected_delivery_and_no_default_consumer)
+ end.
+
+do_cancel(Cancel, State = #state{consumers = Consumers,
+ monitors = Monitors}) ->
+ Tag = tag(Cancel),
+ case dict:find(Tag, Consumers) of
+ {ok, Pid} -> State#state{
+ consumers = dict:erase(Tag, Consumers),
+ monitors = remove_from_monitor_dict(Pid, Monitors)};
+ error -> %% Untracked consumer. Do nothing.
+ State
+ end.
+
+resolve_consumer(Tag, #state{consumers = Consumers,
+ default_consumer = DefaultConsumer}) ->
+ case dict:find(Tag, Consumers) of
+ {ok, ConsumerPid} -> {consumer, ConsumerPid};
+ error -> case DefaultConsumer of
+ none -> error;
+ _ -> {default, DefaultConsumer}
+ end
+ end.
+
+tag(#'basic.consume'{consumer_tag = Tag}) -> Tag;
+tag(#'basic.consume_ok'{consumer_tag = Tag}) -> Tag;
+tag(#'basic.cancel'{consumer_tag = Tag}) -> Tag;
+tag(#'basic.cancel_ok'{consumer_tag = Tag}) -> Tag;
+tag(#'basic.deliver'{consumer_tag = Tag}) -> Tag.
+
+add_to_monitor_dict(Pid, Monitors) ->
+ case dict:find(Pid, Monitors) of
+ error -> dict:store(Pid,
+ {1, erlang:monitor(process, Pid)},
+ Monitors);
+ {ok, {Count, MRef}} -> dict:store(Pid, {Count + 1, MRef}, Monitors)
+ end.
+
+remove_from_monitor_dict(Pid, Monitors) ->
+ case dict:fetch(Pid, Monitors) of
+ {1, MRef} -> erlang:demonitor(MRef),
+ dict:erase(Pid, Monitors);
+ {Count, MRef} -> dict:store(Pid, {Count - 1, MRef}, Monitors)
+ end.
Please sign in to comment.
Something went wrong with that request. Please try again.