Skip to content

Commit

Permalink
Fix review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dcorbacho committed Jan 31, 2023
1 parent 36c2626 commit eba2456
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 16 deletions.
4 changes: 4 additions & 0 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1731,6 +1731,10 @@ forget_node_for_queue(DeadNode, [H|T], Q) when ?is_amqqueue(Q) ->
{false, _} -> forget_node_for_queue(DeadNode, T, Q);
{true, rabbit_classic_queue} ->
Q1 = amqqueue:set_pid(Q, rabbit_misc:node_to_fake_pid(H)),
%% rabbit_db_queue:set_many/1 just stores a durable queue record,
%% that is the only one required here.
%% rabbit_db_queue:set/1 writes both durable and transient, thus
%% can't be used for this operation.
ok = rabbit_db_queue:set_many([Q1]);
{true, rabbit_quorum_queue} ->
ok
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_classic_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ is_recoverable(Q) when ?is_amqqueue(Q) ->
%% record if it is a mirrored queue (such info is now obtained from
%% the policy). Thus, we must check if the local pid is alive
%% - if the record is present - in order to restart.
(not rabbit_amqqueue:exists(amqqueue:get_name(Q))
(not rabbit_db_queue:consistent_exists(amqqueue:get_name(Q))
orelse not rabbit_mnesia:is_process_alive(amqqueue:get_pid(Q))).

recover(VHost, Queues) ->
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/src/rabbit_db_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ not_found_or_absent_errs_in_mnesia(Names) ->
ChecksFun :: fun((Src, Dst) -> ok | {error, Reason :: any()}),
Ret :: ok | {error, Reason :: any()}.
%% @doc Writes a binding if it doesn't exist already and passes the validation in
%% `ChecksFun` i.e. exclusive access
%% `ChecksFun' i.e. exclusive access
%%
%% @returns ok, or an error if the validation has failed.
%%
Expand Down Expand Up @@ -147,7 +147,7 @@ create_in_mnesia(Binding, ChecksFun) ->
ChecksFun :: fun((Src, Dst) -> ok | {error, Reason :: any()}),
Ret :: ok | {ok, rabbit_binding:deletions()} | {error, Reason :: any()}.
%% @doc Deletes a binding record from the database if it passes the validation in
%% `ChecksFun`. It also triggers the deletion of auto-delete exchanges if needed.
%% `ChecksFun'. It also triggers the deletion of auto-delete exchanges if needed.
%%
%% @private

Expand Down
6 changes: 3 additions & 3 deletions deps/rabbit/src/rabbit_db_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,8 @@ next_serial_in_mnesia_tx(XName) ->
Binding :: rabbit_types:binding(),
Deletions :: dict:dict(),
Ret :: {error, not_found} | {error, in_use} | {deleted, Exchange, [Binding], Deletions}.
%% @doc Deletes an exchange record from the database. If `IfUnused` is set
%% to `true`, it is only deleted when there are no bindings present on the
%% @doc Deletes an exchange record from the database. If `IfUnused' is set
%% to `true', it is only deleted when there are no bindings present on the
%% exchange.
%%
%% @returns an error if the exchange does not exist or a tuple with the exchange,
Expand Down Expand Up @@ -471,7 +471,7 @@ recover_in_mnesia(VHost) ->
X;
(X, false) ->
X1 = rabbit_mnesia:execute_mnesia_transaction(
fun() -> set_in_mnesia_tx(X) end),
fun() -> set_ram_in_mnesia_tx(X) end),
Serial = rabbit_exchange:serial(X1),
rabbit_exchange:callback(X1, create, Serial, [X1])
end,
Expand Down
46 changes: 38 additions & 8 deletions deps/rabbit/src/rabbit_db_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
filter_all_durable/1,
update_durable/2,
get_durable/1,
get_many_durable/1
get_many_durable/1,
consistent_exists/1
]).

%% Used by on_node_up and on_node_down
Expand Down Expand Up @@ -294,7 +295,7 @@ delete_in_mnesia(QueueName, Reason) ->

internal_delete(QueueName, OnlyDurable, Reason) ->
%% Only used by rabbit_amqqueue:forget_node_for_queue, which is only called
%% by `rabbit_mnesia:remove_node_if_mnesia_running`. Thus, once mnesia and/or
%% by `rabbit_mnesia:remove_node_if_mnesia_running'. Thus, once mnesia and/or
%% HA queues are removed it can be removed.
rabbit_db:run(
#{mnesia => fun() -> internal_delete_in_mnesia(QueueName, OnlyDurable, Reason) end
Expand Down Expand Up @@ -480,6 +481,31 @@ exists(QName) ->
exists_in_mnesia(QName) ->
ets:member(?MNESIA_TABLE, QName).

%% -------------------------------------------------------------------
%% exists().
%% -------------------------------------------------------------------

-spec consistent_exists(QName) -> Exists when
QName :: rabbit_amqqueue:name(),
Exists :: boolean().
%% @doc Indicates if queue named `QName' exists using a consistent read.
%% Just used by `rabbit_classic_queue:is_recoverable` for transient queues.
%%
%% @returns true if the queue exists, false otherwise.
%%
%% @private

consistent_exists(QName) ->
rabbit_db:run(
#{mnesia => fun() -> consistent_exists_in_mnesia(QName) end
}).

consistent_exists_in_mnesia(QName) ->
case mnesia:read({?MNESIA_TABLE, QName}) of
[] -> false;
[_] -> true
end.

%% -------------------------------------------------------------------
%% get_all_by_type().
%% -------------------------------------------------------------------
Expand Down Expand Up @@ -577,7 +603,9 @@ create_or_get_in_mnesia(Q) ->

-spec set(Queue) -> ok when
Queue :: amqqueue:amqqueue().
%% @doc Writes a queue record
%% @doc Writes a queue record. If the queue is durable, it writes both instances:
%% durable and transient. For the durable one, it resets mirrors and decorators.
%% The transient one is left as it is.
%%
%% @private

Expand Down Expand Up @@ -608,7 +636,9 @@ set_in_mnesia_tx(DurableQ, Q) ->

-spec set_many([Queue]) -> ok when
Queue :: amqqueue:amqqueue().
%% @doc Writes a list of queue records
%% @doc Writes a list of durable queue records.
%% It is responsibility of the calling function to ensure all records are durable.
%% Once transient entities are deprecated, this is a non-issue.
%%
%% @private

Expand Down Expand Up @@ -819,15 +849,15 @@ clear_in_mnesia() ->

list_with_possible_retry_in_mnesia(Fun) ->
%% amqqueue migration:
%% The `rabbit_queue` or `rabbit_durable_queue` tables
%% The `rabbit_queue' or `rabbit_durable_queue' tables
%% might be migrated between the time we query the pattern
%% (with the `amqqueue` module) and the time we call
%% `mnesia:dirty_match_object()`. This would lead to an empty list
%% (with the `amqqueue' module) and the time we call
%% `mnesia:dirty_match_object()'. This would lead to an empty list
%% (no object matching the now incorrect pattern), not a Mnesia
%% error.
%%
%% So if the result is an empty list and the version of the
%% `amqqueue` record changed in between, we retry the operation.
%% `amqqueue' record changed in between, we retry the operation.
%%
%% However, we don't do this if inside a Mnesia transaction: we
%% could end up with a live lock between this started transaction
Expand Down
14 changes: 12 additions & 2 deletions deps/rabbit/src/rabbit_maintenance.erl
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,21 @@ is_being_drained_consistent_read(Node) ->

-spec status_local_read(node()) -> maintenance_status().
status_local_read(Node) ->
rabbit_db_maintenance:get(Node).
case rabbit_db_maintenance:get(Node) of
undefined ->
?DEFAULT_STATUS;
Status ->
Status
end.

-spec status_consistent_read(node()) -> maintenance_status().
status_consistent_read(Node) ->
rabbit_db_maintenance:get_consistent(Node).
case rabbit_db_maintenance:get_consistent(Node) of
undefined ->
?DEFAULT_STATUS;
Status ->
Status
end.

-spec filter_out_drained_nodes_local_read([node()]) -> [node()].
filter_out_drained_nodes_local_read(Nodes) ->
Expand Down
2 changes: 2 additions & 0 deletions deps/rabbit/src/rabbit_policy.erl
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ recover0() ->
Q2 = amqqueue:set_operator_policy(Q1, OpPolicy1),
rabbit_queue_decorator:set(Q2)
end || Q0 <- Qs],
%% This function is just used to recover policies, thus no transient entities
%% are considered for this process as there is none to recover on boot.
_ = rabbit_db_queue:set_many(Qs0),
ok.

Expand Down

0 comments on commit eba2456

Please sign in to comment.