Permalink
Browse files

first import

  • Loading branch information...
0 parents commit f919c24267f616e29d38da6a94ea6a41c045f893 @videlalvaro videlalvaro committed May 6, 2011
Showing with 201 additions and 0 deletions.
  1. +4 −0 .gitignore
  2. +21 −0 LICENSE.md
  3. +37 −0 Makefile
  4. +16 −0 README.md
  5. +3 −0 include/rabbit_rh_plugin.hrl
  6. +5 −0 rebar.config
  7. +82 −0 src/rabbit_exchange_type_rh.erl
  8. +23 −0 src/rabbit_rh_plugin.erl
  9. +10 −0 src/rabbitmq_rh_exchange.app.src
@@ -0,0 +1,4 @@
+.DS_Store
+deps
+dist
+ebin
@@ -0,0 +1,21 @@
+The MIT License
+
+Copyright (c) 2011 Alvaro Videla
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+THE SOFTWARE.
@@ -0,0 +1,37 @@
+PACKAGE=rh-exchange
+DIST_DIR=dist
+EBIN_DIR=ebin
+INCLUDE_DIRS=include
+DEPS_DIR=deps
+DEPS ?=
+DEPS_EZ=$(foreach DEP, $(DEPS), $(DEPS_DIR)/$(DEP).ez)
+RABBITMQ_HOME ?= .
+
+all: compile
+
+clean:
+ rm -rf $(DIST_DIR)
+ rm -rf $(EBIN_DIR)
+
+distclean: clean
+ rm -rf $(DEPS_DIR)
+
+package: compile $(DEPS_EZ)
+ rm -f $(DIST_DIR)/$(PACKAGE).ez
+ mkdir -p $(DIST_DIR)/$(PACKAGE)
+ cp -r $(EBIN_DIR) $(DIST_DIR)/$(PACKAGE)
+ $(foreach EXTRA_DIR, $(INCLUDE_DIRS), cp -r $(EXTRA_DIR) $(DIST_DIR)/$(PACKAGE);)
+ (cd $(DIST_DIR); zip -r $(PACKAGE).ez $(PACKAGE))
+
+install: package
+ $(foreach DEP, $(DEPS_EZ), cp $(DEP) $(RABBITMQ_HOME)/plugins;)
+ cp $(DIST_DIR)/$(PACKAGE).ez $(RABBITMQ_HOME)/plugins
+
+$(DEPS_DIR):
+ ./rebar get-deps
+
+$(DEPS_EZ):
+ cd $(DEPS_DIR); $(foreach DEP, $(DEPS), zip -r $(DEP).ez $(DEP);)
+
+compile: $(DEPS_DIR)
+ ./rebar compile
@@ -0,0 +1,16 @@
+# RabbitMQ Recent History Cache
+
+Keeps track of the last 20 messages that passed thorugh the exchange. Everytime a queue is bound to the exchange it delivers that last 20 messages to them. This is usefull for implementing a very simple __Chat History__ where clients that join the conversation can get the latest messages.
+
+Exchange Type: `x-recent-history`
+
+## Installation ##
+
+ git clone git://github.com/videlalvaro/rabbitmq-recent-history-exchange.git
+ cd rabbitmq-recent-history-exchange
+ make package
+ cp dist/*.ez $RABBITMQ_HOME/plugins
+
+## License ##
+
+See LICENSE.md
@@ -0,0 +1,3 @@
+-define(RH_TABLE, rh_exchange_table).
+
+-record(cached, {key, content}).
@@ -0,0 +1,5 @@
+{
+ deps, [
+ {rabbit_common, ".*", {git, "https://github.com/jbrisbin/rabbit_common.git", "HEAD"}}
+ ]
+}.
@@ -0,0 +1,82 @@
+-module(rabbit_exchange_type_rh).
+-include_lib("rabbit_common/include/rabbit.hrl").
+-include("rabbit_rh_plugin.hrl").
+
+-behaviour(rabbit_exchange_type).
+
+-export([description/0, route/2]).
+-export([validate/1, create/2, recover/2, delete/3,
+ add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
+
+-define(TX, false).
+-define(KEEP_NB, 20).
+
+-include_lib("rabbit_common/include/rabbit_exchange_type_spec.hrl").
+
+description() ->
+ [{name, <<"recent-history">>},
+ {description, <<"List of Last-value caches exchange.">>}].
+
+route(#exchange{name = Name},
+ #delivery{message = #basic_message{
+ content = Content
+ }}) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ InCache = mnesia:read(?RH_TABLE, Name, write),
+ Msgs = case InCache of
+ [#cached{key=Name, content=Cached}] ->
+ Cached;
+ _ ->
+ []
+ end,
+ mnesia:write(?RH_TABLE,
+ #cached{key = Name, content = [Content|lists:sublist(Msgs, ?KEEP_NB)]},
+ write)
+ end),
+ rabbit_router:match_routing_key(Name, ['_']).
+
+validate(_X) -> ok.
+create(_Tx, _X) -> ok.
+recover(_X, _Bs) -> ok.
+
+delete(?TX, #exchange{ name = Name }, _Bs) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ mnesia:delete(?RH_TABLE, Name, write)
+ end),
+ ok;
+delete(_Tx, _X, _Bs) ->
+ ok.
+
+add_binding(?TX, #exchange{ name = XName },
+ #binding{ destination = QueueName }) ->
+ case rabbit_amqqueue:lookup(QueueName) of
+ {error, not_found} ->
+ rabbit_misc:protocol_error(
+ internal_error,
+ "could not find queue '~s'",
+ [QueueName]);
+ {ok, #amqqueue{ pid = Q }} ->
+ Values = case mnesia:dirty_read(?RH_TABLE, XName) of
+ [] ->
+ [];
+ [#cached{content=Cached}] ->
+ lists:map(
+ fun(Content) ->
+ {Props, Payload} = rabbit_basic:from_content(Content),
+ rabbit_basic:message(XName, <<"">>, Props, Payload)
+ end, Cached)
+ end,
+ [rabbit_amqqueue:deliver(
+ Q, rabbit_basic:delivery(false, false, none, V, undefined)) ||
+ V <- lists:reverse(Values)]
+ end,
+ ok;
+add_binding(_Tx, _X, _B) ->
+ ok.
+
+remove_bindings(_Tx, _X, _Bs) -> ok.
+
+assert_args_equivalence(X, Args) ->
+ rabbit_exchange_type_direct:assert_args_equivalence(X, Args).
@@ -0,0 +1,23 @@
+-module(rabbit_rh_plugin).
+
+-include("rabbit_rh_plugin.hrl").
+
+-export([setup_schema/0]).
+
+-rabbit_boot_step({?MODULE,
+ [{description, "recent history exchange type"},
+ {mfa, {rabbit_rh_plugin, setup_schema, []}},
+ {mfa, {rabbit_registry, register, [exchange, <<"x-recent-history">>, rabbit_exchange_type_rh]}},
+ {requires, rabbit_registry},
+ {enables, exchange_recovery}]}).
+
+%% private
+
+setup_schema() ->
+ case mnesia:create_table(?RH_TABLE,
+ [{attributes, record_info(fields, cached)},
+ {record_name, cached},
+ {type, set}]) of
+ {atomic, ok} -> ok;
+ {aborted, {already_exists, ?RH_TABLE}} -> ok
+ end.
@@ -0,0 +1,10 @@
+{application, rabbitmq_rh_exchange,
+ [
+ {description, "RabbitMQ Recent History Exchange"},
+ {vsn, "0.1.0"},
+ {modules, []},
+ {registered, []},
+ {env, []},
+ {applications, [kernel, stdlib, rabbit, mnesia]}
+ ]
+}.

0 comments on commit f919c24

Please sign in to comment.