Skip to content

Commit

Permalink
Unit test rabbit_db_* modules, spec and API updates
Browse files Browse the repository at this point in the history
  • Loading branch information
dcorbacho committed Feb 2, 2023
1 parent 7a12cf8 commit 9cf10ed
Show file tree
Hide file tree
Showing 46 changed files with 3,360 additions and 919 deletions.
45 changes: 40 additions & 5 deletions deps/rabbit/BUILD.bazel
Expand Up @@ -513,15 +513,12 @@ rabbitmq_integration_suite(
],
)

rabbitmq_suite(
rabbitmq_integration_suite(
name = "mirrored_supervisor_SUITE",
size = "small",
additional_srcs = [
"test/mirrored_supervisor_SUITE_gs.erl",
],
deps = [
"//deps/rabbit_common:erlang_app",
],
]
)

rabbitmq_suite(
Expand Down Expand Up @@ -1096,13 +1093,51 @@ rabbitmq_integration_suite(
rabbitmq_integration_suite(
name = "exchanges_SUITE",
size = "small",
additional_beam = [
":quorum_queue_utils",
],
)

rabbitmq_integration_suite(
name = "bindings_SUITE",
size = "small",
)

rabbitmq_integration_suite(
name = "rabbit_db_queue_SUITE",
size = "small",
)

rabbitmq_integration_suite(
name = "rabbit_db_maintenance_SUITE",
size = "small",
)

rabbitmq_integration_suite(
name = "rabbit_db_topic_exchange_SUITE",
size = "small",
)

rabbitmq_integration_suite(
name = "rabbit_db_exchange_SUITE",
size = "small",
)

rabbitmq_integration_suite(
name = "rabbit_db_binding_SUITE",
size = "small",
)

rabbitmq_integration_suite(
name = "rabbit_db_msup_SUITE",
size = "small",
)

rabbitmq_integration_suite(
name = "rabbit_db_policy_SUITE",
size = "small",
)

assert_suites()

filegroup(
Expand Down
183 changes: 103 additions & 80 deletions deps/rabbit/src/rabbit_amqqueue.erl
Expand Up @@ -12,7 +12,7 @@
delete_immediately/1, delete_exclusive/2, delete/4, purge/1,
forget_all_durable/1]).
-export([pseudo_queue/2, pseudo_queue/3, immutable/1]).
-export([exists/1, lookup/1, lookup/2, lookup_many/1,
-export([exists/1, lookup/1, lookup/2, lookup_many/1, lookup_durable_queue/1,
not_found_or_absent_dirty/1,
with/2, with/3, with_or_die/2,
assert_equivalence/5,
Expand Down Expand Up @@ -162,24 +162,28 @@ start(Qs) ->
amqqueue:is_classic(Q)],
ok.

mark_local_durable_queues_stopped(VHost) ->
Qs0 = find_local_durable_queues(VHost),
Qs = [amqqueue:set_state(Q, stopped)
|| Q <- Qs0, amqqueue:get_type(Q) =:= rabbit_classic_queue,
amqqueue:get_state(Q) =/= stopped ],
rabbit_db_queue:insert(Qs).

find_local_durable_queues(VHost) ->
Qs = rabbit_db_queue:get_all_durable(VHost),
lists:filter(fun(Q) ->
rabbit_queue_type:is_recoverable(Q)
end, Qs).
mark_local_durable_queues_stopped(VHostName) ->
rabbit_db_queue:update_durable(
fun(Q) ->
amqqueue:set_state(Q, stopped)
end,
fun(Q) ->
amqqueue:get_vhost(Q) =:= VHostName andalso
rabbit_queue_type:is_recoverable(Q) andalso
amqqueue:get_type(Q) =:= rabbit_classic_queue andalso
amqqueue:get_state(Q) =/= stopped
end).

find_local_durable_queues(VHostName) ->
rabbit_db_queue:filter_all_durable(fun(Q) ->
amqqueue:get_vhost(Q) =:= VHostName andalso
rabbit_queue_type:is_recoverable(Q)
end).

find_recoverable_queues() ->
Qs = rabbit_db_queue:get_all_durable(),
lists:filter(fun(Q) ->
rabbit_queue_type:is_recoverable(Q)
end, Qs).
rabbit_db_queue:filter_all_durable(fun(Q) ->
rabbit_queue_type:is_recoverable(Q)
end).

-spec declare(name(),
boolean(),
Expand Down Expand Up @@ -248,13 +252,12 @@ internal_declare(Q, Recover) ->

do_internal_declare(Q0, true) ->
Q = amqqueue:set_state(Q0, live),
store_queue(Q),
ok = store_queue(Q),
{created, Q0};
do_internal_declare(Q0, false) ->
Q = rabbit_policy:set(amqqueue:set_state(Q0, live)),
Queue = rabbit_queue_decorator:set(Q),
DurableQueue = amqqueue:reset_mirroring_and_decorators(Q),
rabbit_db_queue:create_or_get(DurableQueue, Queue).
rabbit_db_queue:create_or_get(Queue).

-spec update
(name(), fun((amqqueue:amqqueue()) -> amqqueue:amqqueue())) ->
Expand All @@ -272,8 +275,7 @@ ensure_rabbit_queue_record_is_initialized(Q) ->

store_queue(Q0) ->
Q = rabbit_queue_decorator:set(Q0),
DurableQ = amqqueue:reset_mirroring_and_decorators(Q0),
rabbit_db_queue:insert(DurableQ, Q).
rabbit_db_queue:set(Q).

-spec update_decorators(name()) -> 'ok'.

Expand Down Expand Up @@ -316,14 +318,17 @@ is_server_named_allowed(Args) ->
([name()]) ->
[amqqueue:amqqueue()].

lookup([]) -> []; %% optimisation
lookup(Names) ->
rabbit_db_queue:get(Names).
lookup(Name) when is_record(Name, resource) ->
rabbit_db_queue:get(Name).

lookup_durable_queue(QName) ->
rabbit_db_queue:get_durable(QName).

-spec lookup_many ([name()]) -> [amqqueue:amqqueue()].

lookup_many([]) -> []; %% optimisation
lookup_many(Names) when is_list(Names) ->
lookup(Names).
rabbit_db_queue:get_many(Names).

-spec lookup(binary(), binary()) ->
rabbit_types:ok(amqqueue:amqqueue()) |
Expand All @@ -341,7 +346,15 @@ exists(Name) ->
-spec not_found_or_absent_dirty(name()) -> not_found_or_absent().

not_found_or_absent_dirty(Name) ->
rabbit_db_queue:not_found_or_absent_queue_dirty(Name).
%% We should read from both tables inside a tx, to get a
%% consistent view. But the chances of an inconsistency are small,
%% and only affect the error kind.
case rabbit_db_queue:get_durable(Name) of
{error, not_found} ->
not_found;
{ok, Q} ->
{absent, Q, nodedown}
end.

-spec get_rebalance_lock(pid()) ->
{true, {rebalance_queues, pid()}} | false.
Expand Down Expand Up @@ -542,7 +555,7 @@ with(#resource{} = Name, F, E, RetriesLeft) ->
fun () -> retry_wait(Q, F, E, RetriesLeft) end,
fun () -> F(Q) end);
{error, not_found} ->
E(rabbit_db_queue:not_found_or_absent_queue_dirty(Name))
E(not_found_or_absent_dirty(Name))
end.

-spec retry_wait(amqqueue:amqqueue(),
Expand Down Expand Up @@ -1239,16 +1252,18 @@ list_down(VHostPath) ->
false -> [];
true ->
Alive = sets:from_list([amqqueue:get_name(Q) || Q <- list(VHostPath)]),
Durable = rabbit_db_queue:get_all_durable(VHostPath),
NodesRunning = rabbit_nodes:all_running(),
lists:filter(fun (Q) ->
N = amqqueue:get_name(Q),
Pid = amqqueue:get_pid(Q),
St = amqqueue:get_state(Q),
(St =:= stopped andalso not lists:member(node(Pid), NodesRunning))
orelse
(not sets:is_element(N, Alive))
end, Durable)
rabbit_db_queue:filter_all_durable(
fun (Q) ->
N = amqqueue:get_name(Q),
Pid = amqqueue:get_pid(Q),
St = amqqueue:get_state(Q),
amqqueue:get_vhost(Q) =:= VHostPath
andalso
((St =:= stopped andalso not lists:member(node(Pid), NodesRunning))
orelse
(not sets:is_element(N, Alive)))
end)
end.

count(VHost) ->
Expand Down Expand Up @@ -1671,7 +1686,7 @@ internal_delete(QueueName, ActingUser, Reason) ->
ok ->
ok;
Deletions ->
rabbit_binding:process_deletions(Deletions),
_ = rabbit_binding:process_deletions(Deletions),
rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER),
rabbit_core_metrics:queue_deleted(QueueName),
ok = rabbit_event:notify(queue_deleted,
Expand All @@ -1683,12 +1698,12 @@ internal_delete(QueueName, ActingUser, Reason) ->

forget_all_durable(Node) ->
UpdateFun = fun(Q) ->
forget_node_for_queue(Node, Q)
end,
forget_node_for_queue(Node, Q)
end,
FilterFun = fun(Q) ->
is_local_to_node(amqqueue:get_pid(Q), Node)
end,
rabbit_db_queue:match_and_update(amqqueue:pattern_match_all(), UpdateFun, FilterFun).
rabbit_db_queue:foreach_durable(UpdateFun, FilterFun).

%% Try to promote a mirror while down - it should recover as a
%% leader. We try to take the oldest mirror here for best chance of
Expand Down Expand Up @@ -1717,7 +1732,11 @@ forget_node_for_queue(DeadNode, [H|T], Q) when ?is_amqqueue(Q) ->
{false, _} -> forget_node_for_queue(DeadNode, T, Q);
{true, rabbit_classic_queue} ->
Q1 = amqqueue:set_pid(Q, rabbit_misc:node_to_fake_pid(H)),
ok = rabbit_db_queue:insert([Q1]);
%% rabbit_db_queue:set_many/1 just stores a durable queue record,
%% that is the only one required here.
%% rabbit_db_queue:set/1 writes both durable and transient, thus
%% can't be used for this operation.
ok = rabbit_db_queue:set_many([Q1]);
{true, rabbit_quorum_queue} ->
ok
end.
Expand Down Expand Up @@ -1809,43 +1828,45 @@ has_synchronised_mirrors_online(Q) ->
-spec on_node_up(node()) -> 'ok'.

on_node_up(Node) ->
rabbit_db_queue:on_node_up(Node, fun maybe_clear_recoverable_node/2).

maybe_clear_recoverable_node(Node, Q) ->
SPids = amqqueue:get_sync_slave_pids(Q),
RSs = amqqueue:get_recoverable_slaves(Q),
case lists:member(Node, RSs) of
true ->
%% There is a race with
%% rabbit_mirror_queue_slave:record_synchronised/1 called
%% by the incoming mirror node and this function, called
%% by the leader node. If this function is executed after
%% record_synchronised/1, the node is erroneously removed
%% from the recoverable mirror list.
%%
%% We check if the mirror node's queue PID is alive. If it is
%% the case, then this function is executed after. In this
%% situation, we don't touch the queue record, it is already
%% correct.
DoClearNode =
case [SP || SP <- SPids, node(SP) =:= Node] of
[SPid] -> not rabbit_misc:is_process_alive(SPid);
_ -> true
end,
if
DoClearNode -> RSs1 = RSs -- [Node],
store_queue(
amqqueue:set_recoverable_slaves(Q, RSs1));
true -> ok
end;
false ->
ok
rabbit_db_queue:foreach_transient(maybe_clear_recoverable_node(Node)).

maybe_clear_recoverable_node(Node) ->
fun(Q) ->
SPids = amqqueue:get_sync_slave_pids(Q),
RSs = amqqueue:get_recoverable_slaves(Q),
case lists:member(Node, RSs) of
true ->
%% There is a race with
%% rabbit_mirror_queue_slave:record_synchronised/1 called
%% by the incoming mirror node and this function, called
%% by the leader node. If this function is executed after
%% record_synchronised/1, the node is erroneously removed
%% from the recoverable mirror list.
%%
%% We check if the mirror node's queue PID is alive. If it is
%% the case, then this function is executed after. In this
%% situation, we don't touch the queue record, it is already
%% correct.
DoClearNode =
case [SP || SP <- SPids, node(SP) =:= Node] of
[SPid] -> not rabbit_misc:is_process_alive(SPid);
_ -> true
end,
if
DoClearNode -> RSs1 = RSs -- [Node],
store_queue(
amqqueue:set_recoverable_slaves(Q, RSs1));
true -> ok
end;
false ->
ok
end
end.

-spec on_node_down(node()) -> 'ok'.

on_node_down(Node) ->
{Time, Ret} = timer:tc(fun() -> rabbit_db_queue:on_node_down(Node, fun filter_transient_queues_to_delete/2) end),
{Time, Ret} = timer:tc(fun() -> rabbit_db_queue:delete_transient(filter_transient_queues_to_delete(Node)) end),
case Ret of
ok -> ok;
{QueueNames, Deletions} ->
Expand All @@ -1859,12 +1880,14 @@ on_node_down(Node) ->
ok
end.

filter_transient_queues_to_delete(Node, Q) ->
amqqueue:qnode(Q) == Node andalso
not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q))
andalso (not amqqueue:is_classic(Q) orelse not amqqueue:is_durable(Q))
andalso (not rabbit_amqqueue:is_replicated(Q)
orelse rabbit_amqqueue:is_dead_exclusive(Q)).
filter_transient_queues_to_delete(Node) ->
fun(Q) ->
amqqueue:qnode(Q) == Node andalso
not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q))
andalso (not amqqueue:is_classic(Q) orelse not amqqueue:is_durable(Q))
andalso (not rabbit_amqqueue:is_replicated(Q)
orelse rabbit_amqqueue:is_dead_exclusive(Q))
end.

notify_queue_binding_deletions(QueueDeletions) when is_list(QueueDeletions) ->
Deletions = rabbit_binding:process_deletions(
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_amqqueue_process.erl
Expand Up @@ -283,7 +283,7 @@ terminate(shutdown = R, State = #q{backing_queue = BQ, q = Q0}) ->
rabbit_core_metrics:queue_deleted(qname(State)),
terminate_shutdown(
fun (BQS) ->
update_state(stopped, Q0),
_ = update_state(stopped, Q0),
BQ:terminate(R, BQS)
end, State);
terminate({shutdown, missing_owner} = Reason, State) ->
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_basic.erl
Expand Up @@ -68,7 +68,7 @@ publish(Delivery = #delivery{
end.

publish(X, Delivery) ->
Qs = rabbit_amqqueue:lookup(rabbit_exchange:route(X, Delivery)),
Qs = rabbit_amqqueue:lookup_many(rabbit_exchange:route(X, Delivery)),
_ = rabbit_queue_type:deliver(Qs, Delivery, stateless),
ok.

Expand Down

0 comments on commit 9cf10ed

Please sign in to comment.