Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Initial commit

  • Loading branch information...
commit 0b07348987a552a5b7768e1bfcd18591d0928b97 0 parents
@jbrisbin authored
4 .gitignore
@@ -0,0 +1,4 @@
+.DS_Store
+deps
+ebin
+dist
37 Makefile
@@ -0,0 +1,37 @@
+PACKAGE=riak-exchange
+DIST_DIR=dist
+EBIN_DIR=ebin
+INCLUDE_DIRS=include
+DEPS_DIR=deps
+DEPS ?= riakc protobuffs
+DEPS_EZ=$(foreach DEP, $(DEPS), $(DEPS_DIR)/$(DEP).ez)
+RABBITMQ_HOME ?= .
+
+all: compile
+
+clean:
+ rm -rf $(DIST_DIR)
+ rm -rf $(EBIN_DIR)
+
+distclean: clean
+ rm -rf $(DEPS_DIR)
+
+package: compile $(DEPS_EZ)
+ rm -f $(DIST_DIR)/$(PACKAGE).ez
+ mkdir -p $(DIST_DIR)/$(PACKAGE)
+ cp -r $(EBIN_DIR) $(DIST_DIR)/$(PACKAGE)
+ $(foreach EXTRA_DIR, $(INCLUDE_DIRS), cp -r $(EXTRA_DIR) $(DIST_DIR)/$(PACKAGE);)
+ (cd $(DIST_DIR); zip -r $(PACKAGE).ez $(PACKAGE))
+
+install: package
+ $(foreach DEP, $(DEPS_EZ), cp $(DEP) $(RABBITMQ_HOME)/plugins;)
+ cp $(DIST_DIR)/$(PACKAGE).ez $(RABBITMQ_HOME)/plugins
+
+$(DEPS_DIR):
+ ./rebar get-deps
+
+$(DEPS_EZ):
+ cd $(DEPS_DIR); $(foreach DEP, $(DEPS), zip -r $(DEP).ez $(DEP);)
+
+compile: $(DEPS_DIR)
+ ./rebar compile
42 README.md
@@ -0,0 +1,42 @@
+# RabbitMQ Riak Exchange
+
+This is a custom exchange type for RabbitMQ that will put any message sent to it into Riak.
+The name of your exchange becomes the bucket and the routing key you use will become the object key.
+
+## Installation
+
+To install from source:
+
+ git clone https://github.com/jbrisbin/riak-exchange
+ cd riak-exchange
+ make deps
+ make
+ make package
+ cp deps/*.ez $RABBITMQ_HOME/plugins
+ cp dist/*.ez $RABBITMQ_HOME/plugins
+
+I've also put up a tar file of the required .ez files you need to install in your RabbitMQ's plugins directory.
+
+## Configuration
+
+To use the Riak exchange type, declare your exchange as type "x-riak". In addition to forwarding messages to
+Riak, this is also a topic exchange so you can have consumers bound to this exchange and they will receive
+the messages as well as going to Riak.
+
+To configure what Riak server to connect to, pass some arguments to the exchange declaration:
+
+* `host` - Hostname or IP of the Riak server to connect to.
+* `port` - Port number of the Riak server to connect to.
+* `maxclients` - The maximum number of clients to create in the pool (use more clients for higher-traffic exchanges).
+
+## Metadata
+
+#### Content-Type
+
+Whatever Content-Type you set on the message will be the Content-Type used for Riak.
+
+#### Headers
+
+The Riak exchange will translate your custom AMQP message headers into the special `X-Riak-Meta-` headers required
+by Riak. For example, if you set a custom header with the name `customheader`, your Riak document will have a
+header named `X-Riak-Meta-customheader`.
2  include/riak_exchange.hrl
@@ -0,0 +1,2 @@
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit_framing.hrl").
15 priv/send_lots_of_msgs.py
@@ -0,0 +1,15 @@
+#!/usr/bin/env python
+import amqplib.client_0_8 as amqp
+
+conn = amqp.Connection()
+ch = conn.channel()
+
+for i in range(1000):
+ msg = amqp.Message(
+ "Message #%s" % i,
+ content_type="text/plain",
+ application_headers={
+ "test": "header"
+ }
+ )
+ ch.basic_publish(msg, "riak", "msg%s" % i)
14 priv/send_msg.py
@@ -0,0 +1,14 @@
+#!/usr/bin/env python
+import amqplib.client_0_8 as amqp
+
+conn = amqp.Connection()
+ch = conn.channel()
+
+msg = amqp.Message(
+ "Hello World!",
+ content_type="text/plain",
+ application_headers={
+ "test": "header"
+ }
+)
+ch.basic_publish(msg, "riak", "test")
BIN  rebar
Binary file not shown
6 rebar.config
@@ -0,0 +1,6 @@
+{
+ deps, [
+ {rabbit_common, ".*", {git, "https://github.com/jbrisbin/rabbit_common.git", "HEAD"}},
+ {riakc, ".*", {git, "https://github.com/basho/riak-erlang-client.git", "HEAD"}}
+ ]
+}.
10 src/riak_exchange.app.src
@@ -0,0 +1,10 @@
+{application, riak_exchange,
+ [
+ {description, "RabbitMQ Riak Exchange Plugin"},
+ {vsn, "1.0"},
+ {modules, []},
+ {registered, []},
+ {env, []},
+ {applications, [kernel, stdlib, rabbit, mnesia]}
+ ]
+}.
169 src/riak_exchange.erl
@@ -0,0 +1,169 @@
+-module(riak_exchange).
+-include("riak_exchange.hrl").
+-behaviour(rabbit_exchange_type).
+
+-define(EXCHANGE_TYPE_BIN, <<"x-riak">>).
+-define(HOST, <<"host">>).
+-define(PORT, <<"port">>).
+-define(MAX_CLIENTS, <<"maxclients">>).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "exchange type riak"},
+ {mfa, {rabbit_registry, register, [exchange, ?EXCHANGE_TYPE_BIN, ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
+
+-export([
+ description/0,
+ route/2
+]).
+-export([
+ validate/1,
+ create/2,
+ recover/2,
+ delete/3,
+ add_binding/3,
+ remove_bindings/3,
+ assert_args_equivalence/2
+]).
+
+description() ->
+ [{name, ?EXCHANGE_TYPE_BIN}, {description, <<"Experimental Riak Exchange">>}].
+
+validate(X) ->
+ rabbit_exchange_type_topic:validate(X).
+
+create(Tx, X = #exchange{name = #resource{virtual_host=_VirtualHost, name=_Name}, arguments = _Args}) ->
+ XA = exchange_a(X),
+ pg2:create(XA),
+
+ %io:format("create()~n x: ~p~n name: ~p~n args: ~p~n", [X, XA, Args]),
+ case get_riak_client(X) of
+ {ok, _Client} ->
+ %io:format("got client: ~p~n", [Client]),
+ rabbit_exchange_type_topic:create(Tx, X);
+ _ ->
+ error_logger:error_msg("Could not connect to Riak"),
+ {error, "could not connect to riak"}
+ end.
+
+recover(X, _Bs) ->
+ create(none, X).
+
+delete(Tx, X, Bs) ->
+ XA = exchange_a(X),
+ pg2:delete(XA),
+ rabbit_exchange_type_topic:delete(Tx, X, Bs).
+
+add_binding(true, X, B) ->
+ do_add_binding(X, B);
+add_binding(false, _X, _B) ->
+ ok.
+
+remove_bindings(Tx, X, Bs) ->
+ rabbit_exchange_type_topic:remove_bindings(Tx, X, Bs).
+
+assert_args_equivalence(X, Args) ->
+ rabbit_exchange:assert_args_equivalence(X, Args).
+
+route(X=#exchange{name = #resource{virtual_host = _VirtualHost, name = Name}},
+ D=#delivery{message = _Message0 = #basic_message{
+ routing_keys = Routes,
+ content = Content0}}) ->
+ #content{
+ properties = _Props = #'P_basic'{
+ content_type = ContentType,
+ headers = Headers,
+ reply_to = _ReplyTo
+ },
+ payload_fragments_rev = PayloadRev
+ } = rabbit_binary_parser:ensure_content_decoded(Content0),
+
+ case get_riak_client(X) of
+ {ok, Client} ->
+ %io:format("got client: ~p~n", [Client]),
+ % Convert payload to list, concat together
+ Payload = lists:foldl(fun(Chunk, NewPayload) ->
+ <<Chunk/binary, NewPayload/binary>>
+ end, <<>>, PayloadRev),
+
+ lists:foldl(fun(Key, _) ->
+ Obj0 = case riakc_pb_socket:get(Client, Name, Key) of
+ {ok, OldObj} -> riakc_obj:update_value(OldObj, Payload);
+ _ -> riakc_obj:new(Name, Key, Payload, binary_to_list(ContentType))
+ end,
+
+ % Populate metadata from msg properties
+ Obj1 = case lists:foldl(fun({PropKey, _Type, PropVal}, NewProps) ->
+ [{<<"X-Riak-Meta-", PropKey/binary>>, PropVal} | NewProps]
+ end, [], Headers) of
+ [] -> Obj0;
+ CMeta -> riakc_obj:update_metadata(Obj0, dict:store(<<"X-Riak-Meta">>, CMeta, riakc_obj:get_update_metadata(Obj0)))
+ end,
+
+ % Insert data
+ %io:format("new obj: ~p~n", [Obj1]),
+ riakc_pb_socket:put(Client, Obj1)
+ end, [], Routes),
+ rabbit_exchange_type_topic:route(X, D);
+ _Err ->
+ %io:format("err: ~p~n", [Err]),
+ error_logger:error_msg("Could not connect to Riak"),
+ {unroutable, []}
+ end.
+
+do_add_binding(X, B) ->
+ rabbit_exchange_type_topic:add_binding(X, B).
+
+exchange_a(#exchange{name = #resource{virtual_host=VirtualHost, name=Name}}) ->
+ list_to_atom(lists:flatten(io_lib:format("~s ~s", [VirtualHost, Name]))).
+
+%client_a(Host, Port) ->
+% list_to_atom(lists:flatten(io_lib:format("~s ~p", [Host, Port]))).
+
+get_riak_client(X=#exchange{arguments = Args}) ->
+ Host = case lists:keyfind(?HOST, 1, Args) of
+ {_, _, H} -> binary_to_list(H);
+ _ -> "127.0.0.1"
+ end,
+ Port = case lists:keyfind(?PORT, 1, Args) of
+ {_, _, P} ->
+ {Pn, _} = string:to_integer(binary_to_list(P)),
+ Pn;
+ _ -> 8087
+ end,
+ MaxClients = case lists:keyfind(?MAX_CLIENTS, 1, Args) of
+ {_, _, MC} ->
+ {MCn, _} = string:to_integer(binary_to_list(MC)),
+ MCn;
+ _ -> 5
+ end,
+ XA = exchange_a(X),
+
+ try
+ case pg2:get_closest_pid(XA) of
+ {error, _} -> create_riak_client(XA, Host, Port, MaxClients);
+ PbClient ->
+ case riakc_pb_socket:ping(PbClient) of
+ pong ->
+ %io:format("returning good client: ~p~n", [PbClient]),
+ {ok, PbClient};
+ _Else ->
+ pg2:leave(XA, PbClient),
+ get_riak_client(X)
+ end
+ end
+ catch
+ _ -> create_riak_client(XA, Host, Port, MaxClients)
+ end.
+
+create_riak_client(XA, Host, Port, MaxClients) ->
+ case riakc_pb_socket:start_link(Host, Port) of
+ {ok, PbClient} ->
+ pg2:join(XA, PbClient),
+ case length(pg2:get_members(XA)) of
+ S when (S < MaxClients) -> create_riak_client(XA, Host, Port, MaxClients);
+ _ -> {ok, PbClient}
+ end;
+ Err -> Err
+ end.
Please sign in to comment.
Something went wrong with that request. Please try again.