Skip to content

Commit

Permalink
Update for many changes in rabbit_server. In particular: participate …
Browse files Browse the repository at this point in the history
…in transactions, and account for event serial numbers
  • Loading branch information
squaremo committed Jun 17, 2011
1 parent 8a06a79 commit cbbac83
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 45 deletions.
4 changes: 2 additions & 2 deletions ebin/rabbitmq_lvc.app
@@ -1,6 +1,6 @@
{application, rabbit_lvc_plugin,
{application, rabbitmq_lvc,
[{description, "RabbitMQ last-value cache exchange plugin"},
{vsn, "0.01"},
{vsn, "0.0.1"},
{modules, [
rabbit_lvc_plugin,
rabbit_exchange_type_lvc
Expand Down
70 changes: 28 additions & 42 deletions src/rabbit_exchange_type_lvc.erl
Expand Up @@ -4,24 +4,24 @@

-behaviour(rabbit_exchange_type).

-export([description/0, route/2]).
-export([description/0, serialise_events/0, route/2]).
-export([validate/1, create/2, recover/2, delete/3,
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).

-define(TX, false).

-include_lib("rabbit_common/include/rabbit_exchange_type_spec.hrl").

description() ->
[{name, <<"lvc">>},
{description, <<"Last-value cache exchange.">>}].

serialise_events() -> false.

route(Exchange = #exchange{name = Name},
Delivery = #delivery{message = #basic_message{
routing_key = RK,
routing_keys = RKs,
content = Content
}}) ->
Keys = case RK of
Keys = case RKs of
CC when is_list(CC) -> CC;
To -> [To]
end,
Expand All @@ -40,22 +40,18 @@ validate(_X) -> ok.
create(_Tx, _X) -> ok.
recover(_X, _Bs) -> ok.

delete(?TX, #exchange{ name = Name }, _Bs) ->
rabbit_misc:execute_mnesia_transaction(
fun() ->
[mnesia:delete(?LVC_TABLE, K, write) ||
#cached{ key = K } <-
mnesia:match_object(?LVC_TABLE,
#cached{key = #cachekey{
exchange = Name, _ = '_' },
_ = '_'}, write)]
end),
ok;
delete(transaction, #exchange{ name = Name }, _Bs) ->
[mnesia:delete(?LVC_TABLE, K, write) ||
#cached{ key = K } <-
mnesia:match_object(?LVC_TABLE,
#cached{key = #cachekey{
exchange = Name, _ = '_' },
_ = '_'}, write)],
ok;
delete(_Tx, _X, _Bs) ->
ok.


add_binding(?TX, #exchange{ name = XName },
add_binding(none, #exchange{ name = XName },
#binding{ key = RoutingKey,
destination = QueueName }) ->
case rabbit_amqqueue:lookup(QueueName) of
Expand All @@ -65,30 +61,20 @@ add_binding(?TX, #exchange{ name = XName },
"could not find queue '~s'",
[QueueName]);
{ok, #amqqueue{ pid = Q }} ->
%% RabbitMQ doesn't support multiple binding keys YET
Keys = case RoutingKey of
CC when is_list(CC) -> CC;
To -> [To]
end,
Values = lists:foldl(
fun (Key, Msgs) ->
case mnesia:dirty_read(
?LVC_TABLE,
#cachekey{exchange=XName,
routing_key=Key}) of
[] ->
Msgs;
[#cached{content = Content}] ->
{Props, Payload} =
rabbit_basic:from_content(Content),
[rabbit_basic:message(
XName, Key, Props, Payload) | Msgs]
end
end,
[], Keys),
[rabbit_amqqueue:deliver(
Q, rabbit_basic:delivery(false, false, none, V, undefined)) ||
V <- Values]
case mnesia:dirty_read(
?LVC_TABLE,
#cachekey{ exchange=XName,
routing_key=RoutingKey }) of
[] ->
ok;
[#cached{content = Content}] ->
{Props, Payload} =
rabbit_basic:from_content(Content),
Msg = rabbit_basic:message(
XName, RoutingKey, Props, Payload),
rabbit_amqqueue:deliver(
Q, rabbit_basic:delivery(false, false, none, Msg, undefined))
end
end,
ok;
add_binding(_Tx, _X, _B) ->
Expand Down
2 changes: 1 addition & 1 deletion src/rabbit_lvc_plugin.erl
Expand Up @@ -9,7 +9,7 @@
{mfa, {rabbit_lvc_plugin, setup_schema, []}},
{mfa, {rabbit_registry, register, [exchange, <<"x-lvc">>, rabbit_exchange_type_lvc]}},
{requires, rabbit_registry},
{enables, exchange_recovery}]}).
{enables, recovery}]}).

%% private

Expand Down

0 comments on commit cbbac83

Please sign in to comment.