Permalink
Browse files

Updated to RabbitMQ 2.6.1 and Riak client 1.2.0

  • Loading branch information...
1 parent a56d9bc commit e90a96cd8a554c57d44dec03fbbaab157d6cd861 @jbrisbin committed Oct 17, 2011
Showing with 18 additions and 21 deletions.
  1. +1 −2 README.md
  2. +6 −5 priv/send_msg.py
  3. BIN rebar
  4. +2 −2 rebar.config
  5. +1 −1 src/rabbit_exchange_type_riak.app.src
  6. +8 −11 src/rabbit_exchange_type_riak.erl
View
@@ -1,7 +1,6 @@
# RabbitMQ Riak Exchange
-Latest tagged version works with RabbitMQ 2.5.0. It should work for earlier versions of the broker by
-using the proper tag of rabbit_common.
+Latest tagged version works with RabbitMQ 2.6.1 and Riak 1.0.0 (Riak Erlang client ver 1.2.0).
This is a custom exchange type for RabbitMQ that will put any message sent to it into Riak.
By default, the Riak exchange will use your exchange name as the bucket name and your routing key as the Riak
View
@@ -5,13 +5,14 @@
conn = amqp.Connection()
ch = conn.channel()
+timestamp = datetime.now().strftime("%y%m%d%H%M%S%f")
msg = amqp.Message(
- "{\"now\": \"%s\"}" % (datetime.now().strftime("%d/%m/%y %H:%M")),
+ "{\"type\": \"presence\", \"user\": \"jonbrisbin\", \"timestamp\": \"%s\"}" % (datetime.now().strftime("%d/%m/%y %H:%M")),
content_type="application/json",
application_headers={
- "test": "header"
- # "X-Riak-Bucket": "test",
- # "X-Riak-Key": "test2"
+ "Location": "Chicago",
+ "X-Riak-Bucket": "auditlog",
+ "X-Riak-Key": timestamp
}
)
-ch.basic_publish(msg, "rtest", "rtest")
+ch.basic_publish(msg, "auditlog", timestamp)
View
BIN rebar
Binary file not shown.
View
@@ -1,6 +1,6 @@
{
deps, [
- {rabbit_common, ".*", {git, "https://github.com/jbrisbin/rabbit_common.git", {tag, "rabbitmq_2.5.0"}}},
- {riakc, ".*", {git, "https://github.com/basho/riak-erlang-client.git", "HEAD"}}
+ {rabbit_common, ".*", {git, "https://github.com/jbrisbin/rabbit_common.git", {tag, "rabbitmq_2.6.1"}}},
+ {riakc, ".*", {git, "https://github.com/basho/riak-erlang-client.git", {tag, "1.2.0"}}}
]
}.
@@ -1,7 +1,7 @@
{application, rabbit_exchange_type_riak,
[
{description, "RabbitMQ Riak Exchange Plugin"},
- {vsn, "0.1.5"},
+ {vsn, "0.1.6"},
{modules, []},
{registered, []},
{env, []},
@@ -63,10 +63,9 @@ delete(Tx, X, Bs) ->
Exchange = exchange_type(X),
Exchange:delete(Tx, X, Bs).
-add_binding(true, X, B) ->
- do_add_binding(X, B);
-add_binding(false, _X, _B) ->
- ok.
+add_binding(Tx, X, B) ->
+ Exchange = exchange_type(X),
+ Exchange:add_binding(Tx, X, B).
remove_bindings(Tx, X, Bs) ->
Exchange = exchange_type(X),
@@ -94,7 +93,7 @@ route(X=#exchange{name = #resource{virtual_host = _VirtualHost, name = Name}},
Payload = lists:foldl(fun(Chunk, NewPayload) ->
<<Chunk/binary, NewPayload/binary>>
end, <<>>, PayloadRev),
- io:format("payload: ~p~n", [Payload]),
+ % io:format("payload: ~p~n", [Payload]),
lists:foldl(fun(Route, _) ->
% Look for bucket from headers or default to exchange name
@@ -109,14 +108,15 @@ route(X=#exchange{name = #resource{virtual_host = _VirtualHost, name = Name}},
end,
% Insert or update everything
- io:format("storing message to url: /~s/~s~n", [Bucket, Key]),
+ io:format("storing message: /~s/~s as ~s~n", [Bucket, Key, ContentType]),
Obj0 = case riakc_pb_socket:get(Client, Bucket, Key) of
{ok, OldObj} -> riakc_obj:update_value(OldObj, Payload, binary_to_list(ContentType));
_ -> riakc_obj:new(Bucket, Key, Payload, binary_to_list(ContentType))
end,
% Populate metadata from msg properties
- Obj1 = case lists:foldl(fun({PropKey, _Type, PropVal}, NewProps) ->
+ Obj1 = case lists:foldl(fun({PropKey, PropType, PropVal}, NewProps) ->
+ io:format("key, type, val= (~p, ~p, ~p)~n", [PropKey, PropType, PropVal]),
case PropKey of
<<"X-Riak-Bucket", _/binary>> -> NewProps;
<<"X-Riak-Key", _/binary>> -> NewProps;
@@ -138,10 +138,6 @@ route(X=#exchange{name = #resource{virtual_host = _VirtualHost, name = Name}},
Exchange = exchange_type(X),
Exchange:route(X, D).
-do_add_binding(X, B) ->
- Exchange = exchange_type(X),
- Exchange:add_binding(true, X, B).
-
exchange_a(#exchange{name = #resource{virtual_host=VirtualHost, name=Name}}) ->
list_to_atom(lists:flatten(io_lib:format("~s ~s", [VirtualHost, Name]))).
@@ -195,6 +191,7 @@ create_riak_client(XA, Host, Port, MaxClients) ->
exchange_type(#exchange{ arguments=Args }) ->
case lists:keyfind(?TYPE, 1, Args) of
{?TYPE, _, Type} ->
+ %io:format("found type ~p~n", [Type]),
case list_to_atom(binary_to_list(Type)) of
rabbit_exchange_type_riak ->
error_logger:error_report("Cannot base a Riak exchange on a Riak exchange. An infinite loop would occur."),

0 comments on commit e90a96c

Please sign in to comment.