Permalink
Browse files

refactors exchange module

  • Loading branch information...
1 parent 54d7e9e commit 08595860efab35e050808a67c50e14a33cdc90eb @videlalvaro videlalvaro committed Oct 14, 2011
Showing with 67 additions and 39 deletions.
  1. +67 −39 src/rabbit_exchange_type_rh.erl
@@ -1,21 +1,37 @@
-module(rabbit_exchange_type_rh).
-include_lib("rabbit_common/include/rabbit.hrl").
--include("rabbit_rh_plugin.hrl").
+-include_lib("rabbit_common/include/rabbit_exchange_type_spec.hrl").
-behaviour(rabbit_exchange_type).
--export([description/0, route/2]).
--export([validate/1, create/2, recover/2, delete/3,
- add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
+-export([description/0, serialise_events/0, route/2]).
+-export([validate/1, create/2, delete/3, add_binding/3,
+ remove_bindings/3, assert_args_equivalence/2]).
+-export([setup_schema/0]).
--define(TX, false).
--define(KEEP_NB, 20).
+-rabbit_boot_step({rabbit_exchange_type_rh_registry,
+[{description, "recent history exchange type: registry"},
+ {mfa, {rabbit_registry, register,
+ [exchange, <<"x-recent-history">>,
+ rabbit_exchange_type_rh]}},
+ {requires, rabbit_registry},
+ {enables, kernel_ready}]}).
--include_lib("rabbit_common/include/rabbit_exchange_type_spec.hrl").
+-rabbit_boot_step({rabbit_exchange_type_rh_mnesia,
+ [{description, "recent history exchange type: mnesia"},
+ {mfa, {?MODULE, setup_schema, []}},
+ {requires, database},
+ {enables, external_infrastructure}]}).
+
+-define(KEEP_NB, 20).
+-define(RH_TABLE, rh_exchange_table).
+-record(cached, {key, content}).
description() ->
- [{name, <<"recent-history">>},
- {description, <<"List of Last-value caches exchange.">>}].
+ [{name, <<"recent-history">>},
+ {description, <<"List of Last-value caches exchange.">>}].
+
+serialise_events() -> false.
route(#exchange{name = Name},
#delivery{message = #basic_message{
@@ -38,45 +54,57 @@ route(#exchange{name = Name},
validate(_X) -> ok.
create(_Tx, _X) -> ok.
-recover(_X, _Bs) -> ok.
-delete(?TX, #exchange{ name = Name }, _Bs) ->
+delete(_Tx, #exchange{ name = Name }, _Bs) ->
rabbit_misc:execute_mnesia_transaction(
fun() ->
mnesia:delete(?RH_TABLE, Name, write)
end),
- ok;
-delete(_Tx, _X, _Bs) ->
ok.
-add_binding(?TX, #exchange{ name = XName },
+add_binding(_Tx, #exchange{ name = XName },
#binding{ destination = QueueName }) ->
- case rabbit_amqqueue:lookup(QueueName) of
- {error, not_found} ->
- rabbit_misc:protocol_error(
- internal_error,
- "could not find queue '~s'",
- [QueueName]);
- {ok, #amqqueue{ pid = Q }} ->
- Values = case mnesia:dirty_read(?RH_TABLE, XName) of
- [] ->
- [];
- [#cached{content=Cached}] ->
- lists:map(
- fun(Content) ->
- {Props, Payload} = rabbit_basic:from_content(Content),
- rabbit_basic:message(XName, <<"">>, Props, Payload)
- end, Cached)
- end,
- [rabbit_amqqueue:deliver(
- Q, rabbit_basic:delivery(false, false, none, V, undefined)) ||
- V <- lists:reverse(Values)]
- end,
- ok;
-add_binding(_Tx, _X, _B) ->
- ok.
+ case rabbit_amqqueue:lookup(QueueName) of
+ {error, not_found} ->
+ rabbit_misc:protocol_error(
+ internal_error,
+ "could not find queue '~s'",
+ [QueueName]);
+ {ok, #amqqueue{ pid = Queue }} ->
+ Msgs = case mnesia:dirty_read(?RH_TABLE, XName) of
+ [] ->
+ [];
+ [#cached{content=Cached}] ->
+ load_from_content(XName, Cached)
+ end,
+ deliver_messages(Queue, Msgs)
+ end,
+ ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
- rabbit_exchange_type_direct:assert_args_equivalence(X, Args).
+ rabbit_exchange_type_direct:assert_args_equivalence(X, Args).
+
+setup_schema() ->
+ case mnesia:create_table(?RH_TABLE,
+ [{attributes, record_info(fields, cached)},
+ {record_name, cached},
+ {type, set}]) of
+ {atomic, ok} -> ok;
+ {aborted, {already_exists, ?RH_TABLE}} -> ok
+ end.
+
+load_from_content(Cached, XName) ->
+ lists:map(
+ fun(Content) ->
+ {Props, Payload} = rabbit_basic:from_content(Content),
+ rabbit_basic:message(XName, <<"">>, Props, Payload)
+ end, Cached).
+
+deliver_messages(Queue, Msgs) ->
+ lists:map(
+ fun (Msg) ->
+ Delivery = rabbit_basic:delivery(false, false, none, Msg, undefined),
+ rabbit_amqqueue:deliver(Queue, Delivery)
+ end, Msgs).

0 comments on commit 0859586

Please sign in to comment.