Permalink
Browse files

Merge pull request #9 from superbobry/master

Dialyzer compatibility changes
  • Loading branch information...
2 parents f16b180 + 82cb3ee commit af82eae0b9b24b63065cb0e3cbc60f8ac7603c97 Brian Lee Yung Rowe committed Jul 6, 2012
Showing with 46 additions and 40 deletions.
  1. +1 −1 rebar.config
  2. +30 −24 src/bunny_farm.erl
  3. +15 −15 src/farm_tools.erl
View
@@ -8,5 +8,5 @@
{rabbit_common, ".*",
{git, "git://github.com/muxspace/rabbit_common.git", "master"}},
{'bson', ".*",
- {git, "https://github.com/muxspace/bson_erlang.git", "master"}}
+ {git, "https://github.com/superbobry/bson-erlang.git", "master"}}
]}.
View
@@ -1,11 +1,11 @@
%% Copyright 2011 Brian Lee Yung Rowe
-%%
+%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
-%%
+%%
%% http://www.apache.org/licenses/LICENSE-2.0
-%%
+%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,13 +21,13 @@
declare_queue/1, declare_queue/2, declare_queue/3,
bind/3]).
-export([consume/1, consume/2,
- publish/3,
+ publish/3,
rpc/3, rpc/4, respond/3]).
-ifdef(TEST).
-compile(export_all).
-endif.
-%% Convenience function for opening a connection for publishing
+%% Convenience function for opening a connection for publishing
%% messages. The routing key can be included but if it is not,
%% then the connection can be re-used for multiple routing keys
%% on the same exchange.
@@ -101,7 +101,7 @@ publish(#message{payload=Payload, props=Props}, K,
ContentType = {content_type,MimeType},
AProps = farm_tools:to_amqp_props(lists:merge([ContentType], Props)),
AMsg = #amqp_msg{payload=EncPayload, props=AProps},
- BasicPublish = #'basic.publish'{exchange=X, routing_key=K},
+ BasicPublish = #'basic.publish'{exchange=X, routing_key=K},
amqp_channel:cast(Channel, BasicPublish, AMsg);
publish(Payload, RoutingKey, #bus_handle{}=BusHandle) ->
@@ -119,23 +119,23 @@ rpc(#message{payload=Payload, props=Props}, K,
AProps = farm_tools:to_amqp_props(lists:merge([ContentType], Props)),
AMsg = #amqp_msg{payload=farm_tools:encode_payload(MimeType,Payload),
props=AProps},
- BasicPublish = #'basic.publish'{exchange=X, routing_key=K},
+ BasicPublish = #'basic.publish'{exchange=X, routing_key=K},
amqp_channel:cast(Channel, BasicPublish, AMsg).
rpc(Payload, ReplyTo, K, BusHandle) ->
Props = [{reply_to,ReplyTo}, {correlation_id,ReplyTo}],
rpc(#message{payload=Payload, props=Props}, K, BusHandle).
-%% This is used to send the response of an RPC. The primary difference
+%% This is used to send the response of an RPC. The primary difference
%% between this and publish is that the data is retained as an erlang
%% binary.
respond(#message{payload=Payload, props=Props}, RoutingKey,
#bus_handle{exchange=X,channel=Channel}) ->
MimeType = farm_tools:content_type(Props),
AMsg = #amqp_msg{payload=farm_tools:encode_payload(MimeType,Payload),
props=farm_tools:to_amqp_props(Props)},
- BasicPublish = #'basic.publish'{exchange=X, routing_key=RoutingKey},
+ BasicPublish = #'basic.publish'{exchange=X, routing_key=RoutingKey},
amqp_channel:cast(Channel, BasicPublish, AMsg);
respond(Payload, RoutingKey, #bus_handle{}=BusHandle) ->
@@ -161,7 +161,7 @@ declare_exchange(Key, #bus_handle{}=BusHandle) ->
declare_exchange(BusHandle#bus_handle{exchange=Key}).
%% http://www.rabbitmq.com/amqp-0-9-1-quickref.html
-%% Use configured options for the queue. Since no routing key is specified,
+%% Use configured options for the queue. Since no routing key is specified,
%% attempt to read options for the routing key <<"">>.
declare_queue(#bus_handle{}=BusHandle) ->
declare_queue(BusHandle, queue_options(<<"">>)).
@@ -177,7 +177,7 @@ declare_queue(Key, #bus_handle{channel=Channel}, Options) ->
Q.
-
+
bind(Q, _BindKey, #bus_handle{exchange= <<"">>}=BusHandle) ->
BusHandle#bus_handle{queue=Q};
@@ -203,22 +203,30 @@ open_it(Method, #amqp_params{}=Params, #bus_handle{}=BusHandle) ->
default(Key) ->
- D = [ {amqp_username, <<"guest">>},
- {amqp_password, <<"guest">>},
- {amqp_virtual_host, <<"/">>},
- {amqp_servers, []}, % Format is {host,port}
- {amqp_host, "localhost"},
- {amqp_port, 5672},
- {amqp_encoding, <<"application/x-erlang">>},
- {amqp_exchanges, []},
- {amqp_queues, []} ],
- proplists:get_value(Key,D).
+ Defaults = [{amqp_username, <<"guest">>},
+ {amqp_password, <<"guest">>},
+ {amqp_virtual_host, <<"/">>},
+ {amqp_servers, []}, % Format is {host,port}
+ {amqp_host, "localhost"},
+ {amqp_port, 5672},
+ {amqp_encoding, <<"application/x-erlang">>},
+ {amqp_exchanges, []},
+ {amqp_queues, []}],
+
+ %% Note(superbobry): try fetching the value from 'bunny_farm'
+ %% environment first, this might be useful for example when all
+ %% applications use a single RabbitMQ instance.
+ case application:get_env(bunny_farm, Key) of
+ {ok, Value} -> Value;
+ undefined ->
+ proplists:get_value(Key, Defaults)
+ end.
get_env(Key) ->
Default = default(Key),
case application:get_env(Key) of
undefined -> Default;
- {ok,H} -> H
+ {ok, H} -> H
end.
%% If amqp_servers is defined, use that. Otherwise fall back to amqp_host and
@@ -274,5 +282,3 @@ resolve_options(queue, MaybeTuple) ->
K -> Os = lists:merge(queue_options(K),Defaults)
end,
{K,Os}.
-
-
View
@@ -1,11 +1,11 @@
%% Copyright 2011 Brian Lee Yung Rowe
-%%
+%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
-%%
+%%
%% http://www.apache.org/licenses/LICENSE-2.0
-%%
+%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -14,7 +14,7 @@
-module(farm_tools).
-include("bunny_farm.hrl").
--export([decode_properties/1,
+-export([decode_properties/1,
decode_payload/1, decode_payload/2,
encode_payload/1, encode_payload/2]).
-export([to_list/1, atomize/1, atomize/2, listify/1, listify/2]).
@@ -24,7 +24,7 @@
to_amqp_props/1,
to_basic_consume/1,
is_rpc/1,
- reply_to/1, reply_to/2,
+ reply_to/1, reply_to/2,
content_type/1, content_type/2,
encoding/1,
correlation_id/1 ]).
@@ -38,12 +38,12 @@ decode_properties(#amqp_msg{props=Properties}) ->
decode_payload(#amqp_msg{payload=Payload}=Content) ->
decode_payload(content_type(Content), Payload);
-
+
decode_payload(Payload) -> decode_payload(bson, Payload).
decode_payload(none, Payload) -> Payload;
decode_payload(<<"application/octet-stream">>, Payload) -> Payload;
-decode_payload({_E,M,F}, Payload) -> {M,F}(Payload);
+decode_payload({_E,M,F}, Payload) -> M:F(Payload);
decode_payload(<<"application/x-erlang">>, Payload) ->
decode_payload(erlang, Payload);
@@ -53,7 +53,7 @@ decode_payload(<<"application/bson">>, Payload) ->
decode_payload(bson, Payload) ->
try
{Doc,_Bin} = bson_binary:get_document(Payload),
- bson:reflate(Doc)
+ bson:fields(Doc)
catch
error:{badmatch,_} -> decode_payload(erlang, Payload);
error:function_clause -> decode_payload(erlang, Payload)
@@ -63,7 +63,7 @@ encode_payload(Payload) -> encode_payload(bson, Payload).
encode_payload(none, Payload) -> Payload;
encode_payload(<<"application/octet-stream">>, Payload) -> Payload;
-encode_payload({_E,M,F}, Payload) -> {M,F}(Payload);
+encode_payload({_E,M,F}, Payload) -> M:F(Payload);
encode_payload(<<"application/x-erlang">>, Payload) ->
encode_payload(erlang, Payload);
@@ -97,9 +97,9 @@ listify(List) when is_list(List) ->
listify(List, Sep) when is_list(List) ->
[H|T] = List,
lists:foldl(fun(X,Y) -> Y ++ Sep ++ to_list(X) end, to_list(H), T).
-
+
%% Convenience function to convert values to binary strings. Useful for
-%% creating binary names for exchanges or routing keys. Not recommended
+%% creating binary names for exchanges or routing keys. Not recommended
%% for payloads.
binarize(Binary) when is_binary(Binary) -> Binary;
@@ -118,7 +118,7 @@ to_exchange_declare(Props) ->
%% This is a safety in case certain arguments aren't set elsewhere
Defaults = [ {ticket,0}, {arguments,[]} ],
Enriched = lists:merge(Props, Defaults),
- list_to_tuple(['exchange.declare'|[proplists:get_value(X,Enriched,false) ||
+ list_to_tuple(['exchange.declare'|[proplists:get_value(X,Enriched,false) ||
X <- record_info(fields,'exchange.declare')]]).
%% Converts a tuple list of values to a queue.declare record
@@ -127,21 +127,21 @@ to_queue_declare(Props) ->
%% This is a safety in case certain arguments aren't set elsewhere
Defaults = [ {ticket,0}, {arguments,[]} ],
Enriched = lists:merge(Props, Defaults),
- list_to_tuple(['queue.declare'|[proplists:get_value(X,Enriched,false) ||
+ list_to_tuple(['queue.declare'|[proplists:get_value(X,Enriched,false) ||
X <- record_info(fields,'queue.declare')]]).
%% Converts a tuple list to a basic.consume record
-spec to_basic_consume([{atom(), term()}]) -> #'basic.consume'{}.
to_basic_consume(Props) ->
Defaults = [ {ticket,0}, {arguments,[]}, {consumer_tag,<<"">>} ],
Enriched = lists:merge(Props, Defaults),
- list_to_tuple(['basic.consume'|[proplists:get_value(X,Enriched,false) ||
+ list_to_tuple(['basic.consume'|[proplists:get_value(X,Enriched,false) ||
X <- record_info(fields,'basic.consume')]]).
%% Converts a tuple list of values to amqp_msg properties (P_basic)
-spec to_amqp_props([{atom(), term()}]) -> #'P_basic'{}.
to_amqp_props(Props) ->
- list_to_tuple(['P_basic'|[proplists:get_value(X,Props) ||
+ list_to_tuple(['P_basic'|[proplists:get_value(X,Props) ||
X <- record_info(fields,'P_basic')]]).
is_rpc(#amqp_msg{props=Props}) ->

0 comments on commit af82eae

Please sign in to comment.