Permalink
Browse files

Added options support in connection specifications

  • Loading branch information...
1 parent 1711f10 commit 8760db4aab035ff2642fd0adc5d9fde20d1fc8cd muxspace committed May 11, 2011
Showing with 155 additions and 45 deletions.
  1. +7 −3 README
  2. +1 −0 include/bunny_farm.hrl
  3. +16 −10 src/bunny_farm.erl
  4. +6 −1 src/farm_tools.erl
  5. +18 −18 src/gen_qserver.erl
  6. +92 −9 test/gen_qserver_tests.erl
  7. +15 −4 test/my_qserver.erl
View
10 README
@@ -17,9 +17,14 @@ starting a server, an additional argument is passed to the gen_qserver's
start_link function which contains a list of connection specs. These can be
of the following forms:
<<"exchange">> -- Used for publishing
- "exchange" -- Used for publishing (converted to a binary)
+ {<<"exchange">>,Options} -- Used for publishing
{<<"exchange">>, <<"route">>} -- Used for consuming
- {"exchange", "route"} -- Used for consuming
+ {{<<"exchange">>,Options}, {<<"route">>,Options}} -- Used for consuming
+
+Any valid connection parameters for an exchange.declare or queue.declare is
+allowed, plus a few additional conveniences. To set the encoding for a given
+connection, do this:
+ {<<"exchange">>, [{encoding,<<"application/bson">>}]}
Based on the spec, a connection will be made and all bus handles will be
cached in memory by a separate server. These can be accessed in code by
@@ -208,7 +213,6 @@ The encoding property is only available for pub channels.
Future
======
-. Connection-specific overrides
. Message-specific overrides for encoding (currently this is done at the
exchange level via the config)
View
@@ -12,5 +12,6 @@
-type bus_handle() :: #bus_handle{}.
-type exchange() :: binary().
-type routing_key() :: binary().
+-type myabe_binary() :: binary() | {binary(),list()}.
-endif.
View
@@ -5,7 +5,8 @@
-export([declare_exchange/1, declare_exchange/2,
declare_queue/1, declare_queue/2, declare_queue/3,
bind/3]).
--export([consume/1, consume/2, publish/2, publish/3,
+-export([consume/1, consume/2,
+ publish/3,
rpc/3, rpc/4, respond/3]).
-ifdef(TEST).
-compile(export_all).
@@ -36,23 +37,25 @@ open(MaybeX, MaybeK) ->
BusHandle = open_it(#bus_handle{exchange=X, options=XO}),
bunny_farm:declare_exchange(BusHandle),
- Q = bunny_farm:declare_queue(BusHandle, KO),
+ case X of
+ <<"">> -> Q = bunny_farm:declare_queue(K, BusHandle, KO);
+ _ -> Q = bunny_farm:declare_queue(BusHandle, KO)
+ end,
bunny_farm:bind(Q, K, BusHandle).
+close(#bus_handle{exchange = <<"">>}) -> ok;
close(#bus_handle{channel=Channel, conn=Connection}) ->
amqp_channel:close(Channel),
amqp_connection:close(Connection).
close(#bus_handle{}=BusHandle, <<"">>) ->
close(BusHandle);
-close(#bus_handle{channel=Channel, conn=Connection}, Tag) ->
+close(#bus_handle{channel=Channel}=BusHandle, Tag) ->
#'basic.cancel_ok'{} =
amqp_channel:call(Channel, #'basic.cancel'{consumer_tag=Tag}),
- ok = amqp_channel:close(Channel),
- ok = amqp_connection:close(Connection),
- ok.
+ close(BusHandle).
@@ -66,11 +69,11 @@ consume(#bus_handle{queue=Q,channel=Channel}, Options) when is_list(Options) ->
amqp_channel:subscribe(Channel, BasicConsume, self()).
-publish(#message{}=Message, #bus_handle{}=BusHandle) ->
- publish(Message, BusHandle#bus_handle.routing_key, BusHandle);
+%publish(#message{}=Message, #bus_handle{}=BusHandle) ->
+% publish(Message, BusHandle#bus_handle.routing_key, BusHandle);
-publish(Payload, #bus_handle{}=BusHandle) ->
- publish(#message{payload=Payload}, BusHandle).
+%publish(Payload, #bus_handle{}=BusHandle) ->
+% publish(#message{payload=Payload}, BusHandle).
%% This is the recommended call to use as the same exchange can be reused
publish(#message{payload=Payload, props=Props}, K,
@@ -161,6 +164,9 @@ declare_queue(Key, #bus_handle{channel=Channel}, Options) ->
+bind(Q, _BindKey, #bus_handle{exchange= <<"">>}=BusHandle) ->
+ BusHandle#bus_handle{queue=Q};
+
bind(Q, BindKey, #bus_handle{exchange=X, channel=Channel}=BusHandle) ->
QueueBind = #'queue.bind'{exchange=X, queue=Q, routing_key=BindKey},
#'queue.bind_ok'{} = amqp_channel:call(Channel, QueueBind),
View
@@ -10,7 +10,9 @@
to_amqp_props/1,
to_basic_consume/1,
is_rpc/1,
- reply_to/1, reply_to/2, content_type/1, correlation_id/1 ]).
+ reply_to/1, reply_to/2,
+ content_type/1, content_type/2,
+ correlation_id/1 ]).
%% Properties is a 'P_basic' record. We convert it back to a tuple
%% list
@@ -153,6 +155,9 @@ content_type(#amqp_msg{}=Content) ->
content_type(Props) when is_list(Props) ->
p(content_type, Props).
+%% Override the content type in a bus handle
+content_type(ContentType, #bus_handle{options=Os}=BusHandle) ->
+ BusHandle#bus_handle{options=lists:merge([{content_type,ContentType}],Os)}.
p(P, #amqp_msg{}=Content) ->
p(P, farm_tools:decode_properties(Content), undefined);
View
@@ -49,30 +49,30 @@ bus(CachePid, {id,X}) ->
BH -> BH
end.
-%% Consume
--spec connect({exchange(), routing_key()}) -> [ {exchange(), boolean(), bus_handle()} ].
-connect({<<Exchange/binary>>, <<Key/binary>>}) ->
- ?info("Opening ~p => ~p for consuming", [Exchange,Key]),
- Handle = bunny_farm:open(Exchange,Key),
+%% Publish with options
+%-spec connect({maybe_binary(), maybe_binary()}) -> [tuple()].
+connect({<<X/binary>>, Options}) when is_list(Options) ->
+ ?info("Opening ~p for publishing with options ~p", [X,Options]),
+ Handle = bunny_farm:open({X,Options}),
+ %error_logger:info_msg("[gen_qserver] Returning handle spec"),
+ [{id,X}, {tag,<<"">>}, {active,true}, {handle,Handle}];
+
+%% Consume with maybe options
+connect({MaybeX, MaybeK}) ->
+ ?info("Opening ~p => ~p for consuming", [MaybeX,MaybeK]),
+ Handle = bunny_farm:open(MaybeX,MaybeK),
Tag = tag(),
bunny_farm:consume(Handle, [{consumer_tag,Tag}]),
+ Exchange = case MaybeX of
+ {Exch,_Os} -> Exch;
+ Exch -> Exch
+ end,
%error_logger:info_msg("[gen_qserver] Returning handle spec"),
[{id,Exchange}, {tag,Tag}, {handle,Handle}];
-%% Consume
-connect({Exchange, Key}) ->
- connect({farm_tools:binarize(Exchange), farm_tools:binarize(Key)});
-
-%% Publish
-connect(<<Exchange/binary>>) ->
- ?info("Opening ~p for publishing", [Exchange]),
- Handle = bunny_farm:open(Exchange),
- %error_logger:info_msg("[gen_qserver] Returning handle spec"),
- [{id,Exchange}, {tag,<<"">>}, {active,true}, {handle,Handle}];
+%% Publish with no options
+connect(<<X/binary>>) -> connect({X,[]}).
-%% Publish
-connect(Exchange) ->
- connect(farm_tools:binarize(Exchange)).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% GEN_SERVER %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
View
@@ -11,6 +11,10 @@ main_test_() ->
fun set_value_normal/0,
fun get_value_queue/0,
fun set_value_queue/0,
+ fun explicit_encoding/0,
+ fun default_reply_exchange/0,
+ fun explicit_reply_exchange/0,
+ fun conn_spec_override/0,
fun get_connection/0
]
}.
@@ -33,15 +37,15 @@ set_value_normal() ->
?assertEqual(foo, Act).
get_value_queue() ->
- K = <<"gen_qserver_tests">>,
+ K = <<"get_value_queue">>,
PubBus = bunny_farm:open(<<"qserver.two">>),
SubBus = bunny_farm:open(<<"qserver.sub">>, K),
bunny_farm:consume(SubBus),
receive
#'basic.consume_ok'{consumer_tag=ConsumerTag} -> ok
end,
- ReplyTo = <<"qserver.sub:gen_qserver_tests">>,
+ ReplyTo = <<"qserver.sub:get_value_queue">>,
bunny_farm:rpc({get_value, key3}, ReplyTo, <<"key">>, PubBus),
error_logger:info_msg("[gen_qserver_tests] Waiting for response~n"),
@@ -56,25 +60,25 @@ get_value_queue() ->
set_value_queue() ->
- error_logger:info_msg("[gen_qserver_tests] Opening connections~n"),
- K = <<"gen_qserver_tests">>,
+ error_logger:info_msg("[set_value_queue] Opening connections~n"),
+ K = <<"set_value_queue">>,
PubBus = bunny_farm:open(<<"qserver.two">>),
SubBus = bunny_farm:open(<<"qserver.sub">>, K),
- error_logger:info_msg("[gen_qserver_tests] Consuming <<qserver.sub>>~n"),
+ error_logger:info_msg("[set_value_queue] Consuming <<qserver.sub>>~n"),
bunny_farm:consume(SubBus),
receive
#'basic.consume_ok'{consumer_tag=ConsumerTag} -> ok
end,
- error_logger:info_msg("[gen_qserver_tests] Sending set_value"),
+ error_logger:info_msg("[set_value_queue] Sending set_value"),
Message = #message{payload={set_value, key5, 5}, encoding=erlang},
bunny_farm:publish(Message, <<"key">>, PubBus),
- error_logger:info_msg("[gen_qserver_tests] Calling RPC to get_value"),
- ReplyTo = <<"qserver.sub:gen_qserver_tests">>,
+ error_logger:info_msg("[set_value_queue] Calling RPC to get_value"),
+ ReplyTo = <<"qserver.sub:set_value_queue">>,
bunny_farm:rpc({get_value, key5}, ReplyTo, <<"key">>, PubBus),
- error_logger:info_msg("[gen_qserver_tests] Waiting for response~n"),
+ error_logger:info_msg("[set_value_queue] Waiting for response~n"),
receive
{#'basic.deliver'{}, Content} ->
Act = farm_tools:decode_payload(Content)
@@ -85,6 +89,85 @@ set_value_queue() ->
?assertEqual(5, Act).
+explicit_encoding() ->
+ K = <<"explicit_encoding">>,
+ PubBus = bunny_farm:open(<<"qserver.two">>),
+ SubBus = bunny_farm:open(<<"">>, K),
+ bunny_farm:consume(SubBus),
+ receive
+ #'basic.consume_ok'{consumer_tag=ConsumerTag} -> ok
+ end,
+
+ ReplyTo = <<"explicit_encoding">>,
+ BsonBus = farm_tools:content_type(<<"application/bson">>, PubBus),
+ bunny_farm:rpc({get_value, key3}, ReplyTo, <<"key">>, BsonBus),
+
+ error_logger:info_msg("[explicit_encoding] Waiting for response~n"),
+ receive
+ {#'basic.deliver'{}, Content} ->
+ Act = farm_tools:decode_payload(Content)
+ end,
+
+ bunny_farm:close(SubBus, ConsumerTag),
+ bunny_farm:close(PubBus),
+ ?assertEqual(3, Act).
+
+
+default_reply_exchange() ->
+ K = <<"default_reply_exchange">>,
+ PubBus = bunny_farm:open(<<"qserver.two">>),
+ SubBus = bunny_farm:open(<<"">>, K),
+ bunny_farm:consume(SubBus),
+ receive
+ #'basic.consume_ok'{consumer_tag=ConsumerTag} -> ok
+ end,
+
+ ReplyTo = <<"default_reply_exchange">>,
+ bunny_farm:rpc({get_value, key3}, ReplyTo, <<"key">>, PubBus),
+
+ error_logger:info_msg("[default_reply_exchange] Waiting for response~n"),
+ receive
+ {#'basic.deliver'{}, Content} ->
+ Act = farm_tools:decode_payload(Content)
+ end,
+
+ bunny_farm:close(SubBus, ConsumerTag),
+ bunny_farm:close(PubBus),
+ ?assertEqual(3, Act).
+
+
+explicit_reply_exchange() ->
+ K = <<"explicit_reply_exchange">>,
+ PubBus = bunny_farm:open(<<"qserver.two">>),
+ SubBus = bunny_farm:open(<<"qserver.sub">>, K),
+ bunny_farm:consume(SubBus),
+ receive
+ #'basic.consume_ok'{consumer_tag=ConsumerTag} -> ok
+ end,
+
+ ReplyTo = <<"qserver.sub:explicit_reply_exchange">>,
+ bunny_farm:rpc({get_value, key3}, ReplyTo, <<"key">>, PubBus),
+
+ error_logger:info_msg("[explicit_reply_exchange] Waiting for response~n"),
+ receive
+ {#'basic.deliver'{}, Content} ->
+ Act = farm_tools:decode_payload(Content)
+ end,
+
+ bunny_farm:close(SubBus, ConsumerTag),
+ bunny_farm:close(PubBus),
+ ?assertEqual(3, Act).
+
+
+conn_spec_override() ->
+ Conn = my_qserver:get_connection(<<"qserver.three">>),
+ Handle = proplists:get_value(handle,Conn),
+ ?assertEqual(<<"qserver.three">>, proplists:get_value(id,Conn)),
+ ?assertEqual(bus_handle, element(1,Handle)),
+ Encoding = proplists:get_value(encoding,Handle#bus_handle.options),
+ ?assertEqual(<<"application/bson">>, Encoding).
+
+
get_connection() ->
Conn = my_qserver:get_connection(),
Handle = proplists:get_value(handle,Conn),
View
@@ -1,6 +1,7 @@
-module(my_qserver).
-behaviour(gen_qserver).
--export([get_value/1, set_value/2, get_connection/0]).
+-export([get_value/1, set_value/2,
+ get_connection/0, get_connection/1]).
-export([start_link/1, init/2,
handle_call/3,
handle_cast/2,
@@ -14,9 +15,14 @@ set_value(K,V) -> gen_qserver:cast(?MODULE, {set_value,K,V}).
%% For testing one of the cached connections
get_connection() -> gen_qserver:call(?MODULE, connection).
+get_connection(X) -> gen_qserver:call(?MODULE, {connection,X}).
start_link(TupleList) ->
- ConnSpecs = [<<"qserver.one">>, {<<"qserver.two">>,<<"key">>}],
+ ConnSpecs = [
+ <<"qserver.one">>,
+ {<<"qserver.two">>,<<"key">>},
+ {<<"qserver.three">>,[{encoding,<<"application/bson">>}]}
+ ],
gen_qserver:start_link({local,?MODULE}, ?MODULE, TupleList, [], ConnSpecs).
stop(Pid) ->
@@ -31,8 +37,11 @@ handle_call({<<B/binary>>, Args}, From, State) ->
error_logger:info_msg("[my_qserver] Got publish ~p => ~p~n", [B,Args]),
handle_call(Args, From, State);
-handle_call(connection, _From, State) ->
- Conn = qcache:get_conn(State#state.cache_pid, <<"qserver.two">>),
+handle_call(connection, From, State) ->
+ handle_call({connection, <<"qserver.two">>}, From, State);
+
+handle_call({connection,X}, _From, State) ->
+ Conn = qcache:get_conn(State#state.cache_pid, X),
{reply, Conn, State};
handle_call({get_value,K}, _From, State) ->
@@ -48,6 +57,8 @@ handle_cast({set_value,K,V}, State) ->
TupleList = lists:keystore(K,1,State#state.tuples, {K,V}),
{noreply, State#state{tuples=TupleList}};
+handle_cast(stop, State) -> {noreply,State};
+
handle_cast(A, State) ->
error_logger:info_msg("[my_qserver] Got unexpected cast: ~p~n", [A]),
{noreply, State}.

0 comments on commit 8760db4

Please sign in to comment.