Skip to content
This repository has been archived by the owner on Nov 17, 2020. It is now read-only.

Commit

Permalink
allows controlling amount of cached msgs with argument
Browse files Browse the repository at this point in the history
  • Loading branch information
videlalvaro committed Mar 26, 2014
1 parent d990c3b commit 131ef8c
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 7 deletions.
3 changes: 3 additions & 0 deletions include/rabbit_recent_history.hrl
@@ -0,0 +1,3 @@
-define(KEEP_NB, 20).
-define(RH_TABLE, rh_exchange_table).
-record(cached, {key, content}).
1 change: 1 addition & 0 deletions package.mk
@@ -1,2 +1,3 @@
DEPS:=rabbitmq-server rabbitmq-erlang-client
RETAIN_ORIGINAL_VERSION:=true
WITH_BROKER_TEST_COMMANDS:=rabbit_exchange_type_recent_history_test:test()
12 changes: 5 additions & 7 deletions src/rabbit_exchange_type_recent_history.erl
Expand Up @@ -2,14 +2,15 @@

-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl").
-include("rabbit_recent_history.hrl").

-behaviour(rabbit_exchange_type).

-import(rabbit_misc, [table_lookup/2]).

-export([description/0, serialise_events/0, route/2]).
-export([validate/1, validate_binding/2, create/2, delete/3, add_binding/3,
remove_bindings/3, assert_args_equivalence/2]).
remove_bindings/3, assert_args_equivalence/2, policy_changed/2]).
-export([setup_schema/0]).

-rabbit_boot_step({?MODULE,
Expand All @@ -27,10 +28,6 @@
{requires, database},
{enables, external_infrastructure}]}).

-define(KEEP_NB, 20).
-define(RH_TABLE, rh_exchange_table).
-record(cached, {key, content}).

description() ->
[{name, <<"x-recent-history">>},
{description, <<"Recent History Exchange.">>}].
Expand All @@ -47,6 +44,7 @@ route(#exchange{name = XName,
validate(_X) -> ok.
validate_binding(_X, _B) -> ok.
create(_Tx, _X) -> ok.
policy_changed(_X1, _X2) -> ok.

delete(transaction, #exchange{ name = XName }, _Bs) ->
rabbit_misc:execute_mnesia_transaction(
Expand Down Expand Up @@ -121,7 +119,7 @@ get_msgs_from_cache(XName) ->

store_msg(Key, Cached, Content, undefined) ->
store_msg0(Key, Cached, Content, ?KEEP_NB);
store_msg(Key, Cached, Content, Length) ->
store_msg(Key, Cached, Content, {_Type, Length}) ->
store_msg0(Key, Cached, Content, Length).

store_msg0(Key, Cached, Content, Length) ->
Expand All @@ -141,7 +139,7 @@ deliver_messages(Queue, Msgs) ->
error_logger:info_msg("Queue: ~p length: ~p~n", [Queue, length(Msgs)]),
lists:map(
fun (Msg) ->
Delivery = rabbit_basic:delivery(false, Msg, undefined),
Delivery = rabbit_basic:delivery(false, false, Msg, undefined),
rabbit_amqqueue:deliver([Queue], Delivery)
end, lists:reverse(Msgs)).

Expand Down
93 changes: 93 additions & 0 deletions test/src/rabbit_exchange_type_recent_history_test.erl
@@ -0,0 +1,93 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ Consistent Hash Exchange.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2011-2014 GoPivotal, Inc. All rights reserved.
%%

-module(rabbit_exchange_type_recent_history_test).

-export([test/0]).

-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
-include("rabbit_recent_history.hrl").

test() ->
t([<<"q0">>, <<"q1">>, <<"q2">>, <<"q3">>]).

t(Qs) ->
ok = test_with_default_length(Qs),
ok = test_with_length(Qs),
ok.

test_with_default_length(Qs) ->
test0(fun () ->
#'basic.publish'{exchange = <<"e">>}
end,
fun() ->
#amqp_msg{props = #'P_basic'{}, payload = <<>>}
end, [], Qs, 100, length(Qs) * ?KEEP_NB).

test_with_length(Qs) ->
test0(fun () ->
#'basic.publish'{exchange = <<"e">>}
end,
fun() ->
#amqp_msg{props = #'P_basic'{}, payload = <<>>}
end, [{<<"history-length">>, long, 30}], Qs, 100, length(Qs) * 30).

test0(MakeMethod, MakeMsg, DeclareArgs, Queues, MsgCount, ExpectedCount) ->

{ok, Conn} = amqp_connection:start(#amqp_params_network{}),
{ok, Chan} = amqp_connection:open_channel(Conn),
#'exchange.declare_ok'{} =
amqp_channel:call(Chan,
#'exchange.declare' {
exchange = <<"e">>,
type = <<"x-recent-history">>,
auto_delete = true,
arguments = DeclareArgs
}),

#'tx.select_ok'{} = amqp_channel:call(Chan, #'tx.select'{}),
[amqp_channel:call(Chan,
MakeMethod(),
MakeMsg()) || _ <- lists:duplicate(MsgCount, const)],
amqp_channel:call(Chan, #'tx.commit'{}),

[#'queue.declare_ok'{} =
amqp_channel:call(Chan, #'queue.declare' {
queue = Q, exclusive = true }) || Q <- Queues],

[#'queue.bind_ok'{} =
amqp_channel:call(Chan, #'queue.bind' { queue = Q,
exchange = <<"e">>,
routing_key = <<"">>})
|| Q <- Queues],

Counts =
[begin
#'queue.declare_ok'{message_count = M} =
amqp_channel:call(Chan, #'queue.declare' {queue = Q,
exclusive = true }),
M
end || Q <- Queues],


?assertEqual(ExpectedCount, lists:sum(Counts)),

amqp_channel:call(Chan, #'exchange.delete' { exchange = <<"e">> }),
[amqp_channel:call(Chan, #'queue.delete' { queue = Q }) || Q <- Queues],
amqp_channel:close(Chan),
amqp_connection:close(Conn),
ok.

0 comments on commit 131ef8c

Please sign in to comment.