Permalink
Browse files

refactoring

  • Loading branch information...
1 parent c56e62d commit 2a34cf0f87b5ac0a9be38183e2a3e7749b6ff048 @videlalvaro videlalvaro committed Oct 15, 2011
Showing with 49 additions and 37 deletions.
  1. +49 −37 src/rabbit_exchange_type_recent_history.erl
@@ -33,51 +33,32 @@ description() ->
serialise_events() -> false.
-route(#exchange{name = Name},
+route(#exchange{name = XName},
#delivery{message = #basic_message{
content = Content
}}) ->
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- InCache = mnesia:read(?RH_TABLE, Name, write),
- Msgs = case InCache of
- [#cached{key=Name, content=Cached}] ->
- Cached;
- _ ->
- []
- end,
- mnesia:write(?RH_TABLE,
- #cached{key = Name, content = [Content|lists:sublist(Msgs, ?KEEP_NB)]},
- write)
- end),
- rabbit_router:match_routing_key(Name, ['_']).
+ cache_msg(XName, Content),
+ rabbit_router:match_routing_key(XName, ['_']).
validate(_X) -> ok.
create(_Tx, _X) -> ok.
-delete(_Tx, #exchange{ name = Name }, _Bs) ->
- rabbit_misc:execute_mnesia_transaction(
- fun() ->
- mnesia:delete(?RH_TABLE, Name, write)
- end),
- ok.
+delete(_Tx, #exchange{ name = XName }, _Bs) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun() ->
+ mnesia:delete(?RH_TABLE, XName, write)
+ end),
+ ok.
add_binding(_Tx, #exchange{ name = XName },
- #binding{ destination = QueueName }) ->
- case rabbit_amqqueue:lookup(QueueName) of
+ #binding{ destination = QName }) ->
+ case rabbit_amqqueue:lookup(QName) of
{error, not_found} ->
- rabbit_misc:protocol_error(
- internal_error,
- "could not find queue '~s'",
- [QueueName]);
- {ok, #amqqueue{ pid = Queue }} ->
- Msgs = case mnesia:dirty_read(?RH_TABLE, XName) of
- [] ->
- [];
- [#cached{content=Cached}] ->
- load_from_content(XName, Cached)
- end,
- deliver_messages(Queue, Msgs)
+ queue_not_found_error(QName);
+ {ok, #amqqueue{ pid = QPid }} ->
+ Cached = get_msgs_from_cache(XName),
+ Msgs = msgs_from_content(XName, Cached),
+ deliver_messages(QPid, Msgs)
end,
ok.
@@ -86,6 +67,7 @@ remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(X, Args) ->
rabbit_exchange_type_direct:assert_args_equivalence(X, Args).
+%%private
setup_schema() ->
case mnesia:create_table(?RH_TABLE,
[{attributes, record_info(fields, cached)},
@@ -95,7 +77,31 @@ setup_schema() ->
{aborted, {already_exists, ?RH_TABLE}} -> ok
end.
-load_from_content(XName, Cached) ->
+cache_msg(XName, Content) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ Cached = get_msgs_from_cache(XName),
+ store_msg(XName, Cached, Content)
+ end).
+
+get_msgs_from_cache(XName) ->
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read(?RH_TABLE, XName) of
+ [] ->
+ [];
+ [#cached{key = XName, content=Cached}] ->
+ Cached
+ end
+ end).
+
+store_msg(Key, Cached, Content) ->
+ mnesia:write(?RH_TABLE,
+ #cached{key = Key,
+ content = [Content|lists:sublist(Cached, ?KEEP_NB)]},
+ write).
+
+msgs_from_content(XName, Cached) ->
lists:map(
fun(Content) ->
{Props, Payload} = rabbit_basic:from_content(Content),
@@ -107,4 +113,10 @@ deliver_messages(Queue, Msgs) ->
fun (Msg) ->
Delivery = rabbit_basic:delivery(false, false, Msg, undefined),
rabbit_amqqueue:deliver(Queue, Delivery)
- end, Msgs).
+ end, Msgs).
+
+queue_not_found_error(QName) ->
+ rabbit_misc:protocol_error(
+ internal_error,
+ "could not find queue '~s'",
+ [QName]).

0 comments on commit 2a34cf0

Please sign in to comment.