Permalink
Browse files

Initial commit

  • Loading branch information...
0 parents commit be4be38e5d1b46035d6b813e428771a9f9a04ed5 @jbrisbin committed Apr 15, 2011
Showing with 210 additions and 0 deletions.
  1. +3 −0 .gitignore
  2. +37 −0 Makefile
  3. +48 −0 README.md
  4. BIN rebar
  5. +11 −0 rebar.config
  6. +10 −0 src/riak_rabbitmq.app.src
  7. +101 −0 src/riak_rabbitmq.erl
@@ -0,0 +1,3 @@
+.DS_Store
+ebin
+deps
@@ -0,0 +1,37 @@
+PACKAGE=riak-rabbitmq-postcommit-hook
+DIST_DIR=dist
+EBIN_DIR=ebin
+INCLUDE_DIRS=include
+DEPS_DIR=deps
+DEPS ?= amqp_client
+DEPS_EZ=$(foreach DEP, $(DEPS), $(DEPS_DIR)/$(DEP).ez)
+RABBITMQ_HOME ?= .
+
+all: compile
+
+clean:
+ rm -rf $(DIST_DIR)
+ rm -rf $(EBIN_DIR)
+
+distclean: clean
+ rm -rf $(DEPS_DIR)
+
+package: compile $(DEPS_EZ)
+ rm -f $(DIST_DIR)/$(PACKAGE).ez
+ mkdir -p $(DIST_DIR)/$(PACKAGE)
+ cp -r $(EBIN_DIR) $(DIST_DIR)/$(PACKAGE)
+ $(foreach EXTRA_DIR, $(INCLUDE_DIRS), cp -r $(EXTRA_DIR) $(DIST_DIR)/$(PACKAGE);)
+ (cd $(DIST_DIR); zip -r $(PACKAGE).ez $(PACKAGE))
+
+install: package
+ $(foreach DEP, $(DEPS_EZ), cp $(DEP) $(RABBITMQ_HOME)/plugins;)
+ cp $(DIST_DIR)/$(PACKAGE).ez $(RABBITMQ_HOME)/plugins
+
+$(DEPS_DIR):
+ ./rebar get-deps
+
+$(DEPS_EZ):
+ cd $(DEPS_DIR); $(foreach DEP, $(DEPS), zip -r $(DEP).ez $(DEP);)
+
+compile: $(DEPS_DIR)
+ ./rebar compile
@@ -0,0 +1,48 @@
+# Riak RabbitMQ postcommit hook
+
+This is a postcommit hook that sends entries into a RabbitMQ broker using the
+Erlang AMQP client.
+
+You set this module/function as your postcommit hook using whatever tools you're
+used to. Your bucket properties should look something like this:
+
+ {
+ "props":{
+ "postcommit":[{"mod":"riak_rabbitmq","fun":"postcommit_send_amqp"}],
+ ... other props ...
+ }
+ }
+
+### Installation
+
+To install this module, clone the source, build, then symlink the required
+dependencies into your Riak install:
+
+ git clone git://github.com/jbrisbin/riak-rabbitmq-commit-hooks.git
+ cd riak-rabbitmq-commit-hooks
+ ./rebar get-deps
+ make
+ [...wait for a long time while spidermonkey builds...]
+ cd $RIAK_LIBS
+ ln -s $BUILD_DIR riak_rabbitmq-0.1.0
+ ln -s $BUILD_DIR/deps/amqp_client amqp_client-2.4.1
+ ln -s $BUILD_DIR/deps/rabbit_common rabbit_common-2.4.1
+
+This should expose the module and the right dependencies to your Riak server so
+that your postcommit hook will actually work.
+
+### Configuration
+
+To tell the commit hook where to send your entry in the form of an AMQP message,
+you can pass special metadata properties to influence the commit hook's behaviour.
+The list of acceptable properties is pretty self-explanatory:
+
+* `X-Riak-Meta-AMQP-Exchange`
+* `X-Riak-Meta-AMQP-Routing-Key`
+* `X-Riak-Meta-AMQP-Host`
+* `X-Riak-Meta-AMQP-Port`
+* `X-Riak-Meta-AMQP-VHost`
+* `X-Riak-Meta-AMQP-User`
+* `X-Riak-Meta-AMQP-Password`
+
+This utility is Apache licensed, just like Riak.
BIN rebar
Binary file not shown.
@@ -0,0 +1,11 @@
+{require_otp_vsn, "R13B04|R14"}.
+{cover_enabled, true}.
+{erl_opts, [debug_info, fail_on_warning]}.
+
+{erl_first_files, ["src/riak_rabbitmq.erl"]}.
+
+{deps,
+ [
+ {amqp_client, ".*", {git, "https://github.com/jbrisbin/amqp_client.git", "master"}},
+ {riak_kv, ".*", {git, "https://github.com/basho/riak_kv.git", "master"}}
+ ]}.
@@ -0,0 +1,10 @@
+{application, riak_rabbitmq,
+ [
+ {description, "Riak RabbitMQ post commit hook"},
+ {vsn, "1.0"},
+ {modules, []},
+ {registered, []},
+ {env, []},
+ {applications, [kernel, stdlib]}
+ ]
+}.
@@ -0,0 +1,101 @@
+-module(riak_rabbitmq).
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-export([
+ postcommit_send_amqp/1
+]).
+
+-define(DELETED, <<"X-Riak-Deleted">>).
+-define(META, <<"X-Riak-Meta">>).
+-define(EXCHANGE, <<"X-Riak-Meta-AMQP-Exchange">>).
+-define(ROUTING_KEY, <<"X-Riak-Meta-AMQP-Routing-Key">>).
+-define(HOST, <<"X-Riak-Meta-AMQP-Host">>).
+-define(PORT, <<"X-Riak-Meta-AMQP-Port">>).
+-define(VHOST, <<"X-Riak-Meta-AMQP-VHost">>).
+-define(USER, <<"X-Riak-Meta-AMQP-User">>).
+-define(PASSWORD, <<"X-Riak-Meta-AMQP-Password">>).
+
+-define(CONTENT_TYPE, <<"content-type">>).
+
+postcommit_send_amqp(RObj) ->
+ % io:format("robj: ~p~n", [RObj]),
+ Metadata = riak_object:get_metadata(RObj),
+ % io:format("metadata: ~p~n", [Metadata]),
+
+ Exchange = key_find(?EXCHANGE, Metadata, riak_object:bucket(RObj)),
+ RoutingKey = key_find(?ROUTING_KEY, Metadata, riak_object:key(RObj)),
+ ContentType = list_to_binary(key_find(?CONTENT_TYPE, Metadata, "application/octet-stream")),
+ Body = riak_object:get_value(RObj),
+
+ % Publish a message
+ Publish = #'basic.publish'{ exchange=Exchange, routing_key=RoutingKey },
+ Headers = key_find(?META, Metadata, []),
+ AppHdrs = lists:foldl(fun ({HdrKey, HdrValue}, NewHdrs) ->
+ <<_:12/binary, Key/binary>> = list_to_binary(HdrKey),
+ [{binary_to_list(Key), binary, HdrValue} | NewHdrs]
+ end, [], Headers),
+ ExtraHdrs = [case key_find(?DELETED, Metadata, "false") of
+ "true" -> {"X-Riak-Deleted", binary, "true"};
+ "false" -> []
+ end],
+ Props = #'P_basic'{ content_type=ContentType, headers=lists:flatten([AppHdrs, ExtraHdrs]) },
+
+ Msg = #amqp_msg{ payload=Body, props=Props },
+ % io:format("message: ~p~n", [Msg]),
+ {ok, Channel} = amqp_channel(RObj),
+ amqp_channel:cast(Channel, Publish, Msg),
+
+ %ObjJson = mochijson2:encode(riak_object:to_json(RObj)),
+ % io:format("sent object: ~p~n", [ObjJson]),
+
+ ok.
+
+amqp_channel(RObj) ->
+ AmqpParams = amqp_p(RObj),
+ case pg2:get_closest_pid(AmqpParams) of
+ {error, {no_such_group, _}} ->
+ pg2:create(AmqpParams),
+ amqp_channel(RObj);
+ {error, {no_process, _}} ->
+ case amqp_connection:start(network, AmqpParams) of
+ {ok, Client} ->
+ % io:format("started new client: ~p~n", [Client]),
+ case amqp_connection:open_channel(Client) of
+ {ok, Channel} ->
+ pg2:join(AmqpParams, Channel),
+ {ok, Channel};
+ {error, Reason} -> {error, Reason}
+ end;
+ {error, Reason} -> {error, Reason}
+ end;
+ Channel ->
+ % io:format("using existing channel: ~p~n", [Channel]),
+ {ok, Channel}
+ end.
+
+amqp_p(RObj) ->
+ Metadata = riak_object:get_metadatas(RObj),
+
+ Host = find(?HOST, Metadata, <<"127.0.0.1">>),
+ Port = find(?PORT, Metadata, 5672),
+ Vhost = find(?VHOST, Metadata, <<"/">>),
+ User = find(?USER, Metadata, <<"guest">>),
+ Pass = find(?PASSWORD, Metadata, <<"guest">>),
+
+ #amqp_params{username = User,
+ password = Pass,
+ virtual_host = Vhost,
+ host = binary_to_list(Host),
+ port = Port}.
+
+find(K, L, Default) ->
+ case lists:keyfind(K, 1, L) of
+ {K, V} -> V;
+ _ -> Default
+ end.
+
+key_find(K, D, Default) ->
+ case dict:find(K, D) of
+ {ok, V} -> V;
+ _ -> Default
+ end.

0 comments on commit be4be38

Please sign in to comment.