Skip to content
Browse files

Tweaked README, cleaned formatting

  • Loading branch information...
1 parent be4be38 commit 0d3e3ab28aebc0d8316622efc78c3761ede03c1a @jbrisbin committed Apr 15, 2011
Showing with 18 additions and 14 deletions.
  1. +4 −0 README.md
  2. +14 −14 src/riak_rabbitmq.erl
View
4 README.md
@@ -15,6 +15,10 @@ used to. Your bucket properties should look something like this:
### Installation
+I haven't had time to create a binary download yet. In the interest of early
+adoption, you're free to build and install from source. But if you can wait
+a day or two, I'll have a binary build up. :)
+
To install this module, clone the source, build, then symlink the required
dependencies into your Riak install:
View
28 src/riak_rabbitmq.erl
@@ -18,35 +18,35 @@
-define(CONTENT_TYPE, <<"content-type">>).
postcommit_send_amqp(RObj) ->
- % io:format("robj: ~p~n", [RObj]),
+ % io:format("robj: ~p~n", [RObj]),
Metadata = riak_object:get_metadata(RObj),
- % io:format("metadata: ~p~n", [Metadata]),
+ % io:format("metadata: ~p~n", [Metadata]),
Exchange = key_find(?EXCHANGE, Metadata, riak_object:bucket(RObj)),
RoutingKey = key_find(?ROUTING_KEY, Metadata, riak_object:key(RObj)),
ContentType = list_to_binary(key_find(?CONTENT_TYPE, Metadata, "application/octet-stream")),
Body = riak_object:get_value(RObj),
- % Publish a message
+ % Publish a message
Publish = #'basic.publish'{ exchange=Exchange, routing_key=RoutingKey },
Headers = key_find(?META, Metadata, []),
AppHdrs = lists:foldl(fun ({HdrKey, HdrValue}, NewHdrs) ->
- <<_:12/binary, Key/binary>> = list_to_binary(HdrKey),
- [{binary_to_list(Key), binary, HdrValue} | NewHdrs]
- end, [], Headers),
+ <<_:12/binary, Key/binary>> = list_to_binary(HdrKey),
+ [{binary_to_list(Key), binary, HdrValue} | NewHdrs]
+ end, [], Headers),
ExtraHdrs = [case key_find(?DELETED, Metadata, "false") of
- "true" -> {"X-Riak-Deleted", binary, "true"};
- "false" -> []
- end],
+ "true" -> {"X-Riak-Deleted", binary, "true"};
+ "false" -> []
+ end],
Props = #'P_basic'{ content_type=ContentType, headers=lists:flatten([AppHdrs, ExtraHdrs]) },
Msg = #amqp_msg{ payload=Body, props=Props },
- % io:format("message: ~p~n", [Msg]),
+ % io:format("message: ~p~n", [Msg]),
{ok, Channel} = amqp_channel(RObj),
amqp_channel:cast(Channel, Publish, Msg),
- %ObjJson = mochijson2:encode(riak_object:to_json(RObj)),
- % io:format("sent object: ~p~n", [ObjJson]),
+ %ObjJson = mochijson2:encode(riak_object:to_json(RObj)),
+ % io:format("sent object: ~p~n", [ObjJson]),
ok.
@@ -59,7 +59,7 @@ amqp_channel(RObj) ->
{error, {no_process, _}} ->
case amqp_connection:start(network, AmqpParams) of
{ok, Client} ->
- % io:format("started new client: ~p~n", [Client]),
+ % io:format("started new client: ~p~n", [Client]),
case amqp_connection:open_channel(Client) of
{ok, Channel} ->
pg2:join(AmqpParams, Channel),
@@ -69,7 +69,7 @@ amqp_channel(RObj) ->
{error, Reason} -> {error, Reason}
end;
Channel ->
- % io:format("using existing channel: ~p~n", [Channel]),
+ % io:format("using existing channel: ~p~n", [Channel]),
{ok, Channel}
end.

0 comments on commit 0d3e3ab

Please sign in to comment.
Something went wrong with that request. Please try again.