Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1818,8 +1818,8 @@ internal_delete(Queue, ActingUser, Reason) ->
{error, timeout} = Err ->
Err;
Deletions ->
_ = rabbit_binding:process_deletions(Deletions),
rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER),
ok = rabbit_binding:process_deletions(Deletions),
ok = rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER),
rabbit_core_metrics:queue_deleted(QueueName),
ok = rabbit_event:notify(queue_deleted,
[{name, QueueName},
Expand Down Expand Up @@ -1942,14 +1942,14 @@ filter_transient_queues_to_delete(Node) ->
end.

notify_queue_binding_deletions(QueueDeletions) when is_list(QueueDeletions) ->
Deletions = rabbit_binding:process_deletions(
lists:foldl(fun rabbit_binding:combine_deletions/2,
rabbit_binding:new_deletions(),
QueueDeletions)),
Deletions = lists:foldl(
fun rabbit_binding:combine_deletions/2,
rabbit_binding:new_deletions(), QueueDeletions),
ok = rabbit_binding:process_deletions(Deletions),
rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER);
notify_queue_binding_deletions(QueueDeletions) ->
Deletions = rabbit_binding:process_deletions(QueueDeletions),
rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER).
ok = rabbit_binding:process_deletions(QueueDeletions),
rabbit_binding:notify_deletions(QueueDeletions, ?INTERNAL_USER).

notify_transient_queues_deleted(QueueDeletions) ->
lists:foreach(
Expand Down
235 changes: 158 additions & 77 deletions deps/rabbit/src/rabbit_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
-export([list/1, list_for_source/1, list_for_destination/1,
list_for_source_and_destination/2, list_for_source_and_destination/3,
list_explicit/0]).
-export([new_deletions/0, combine_deletions/2, add_deletion/3,
-export([new_deletions/0, combine_deletions/2, add_deletion/5,
process_deletions/1, notify_deletions/2, group_bindings_fold/3]).
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4]).

Expand All @@ -22,6 +22,9 @@
-export([reverse_route/1, index_route/1]).
-export([binding_type/2]).

%% For testing only
-export([fetch_deletion/2]).

-define(DEFAULT_EXCHANGE(VHostPath), #resource{virtual_host = VHostPath,
kind = exchange,
name = <<>>}).
Expand Down Expand Up @@ -50,9 +53,12 @@
rabbit_types:ok_or_error(rabbit_types:amqp_error())).
-type bindings() :: [rabbit_types:binding()].

%% TODO this should really be opaque but that seems to confuse 17.1's
%% dialyzer into objecting to everything that uses it.
-type deletions() :: dict:dict().
-record(deletion, {exchange :: rabbit_types:exchange(),
%% Whether the exchange was deleted.
deleted :: boolean(),
bindings :: sets:set(rabbit_types:binding())}).

-opaque deletions() :: #{XName :: rabbit_exchange:name() => #deletion{}}.

%%----------------------------------------------------------------------------

Expand Down Expand Up @@ -159,6 +165,19 @@ binding_type0(false, true) ->
binding_type0(_, _) ->
transient.

binding_checks(Binding, InnerFun) ->
fun(Src, Dst) ->
case rabbit_exchange:validate_binding(Src, Binding) of
ok ->
%% this argument is used to check queue exclusivity;
%% in general, we want to fail on that in preference to
%% anything else
InnerFun(Src, Dst);
Err ->
Err
end
end.

-spec remove(rabbit_types:binding(), rabbit_types:username()) -> bind_res().
remove(Binding, ActingUser) -> remove(Binding, fun (_Src, _Dst) -> ok end, ActingUser).

Expand Down Expand Up @@ -360,93 +379,155 @@ index_route(#route{binding = #binding{source = Source,
%% ----------------------------------------------------------------------------
%% Binding / exchange deletion abstraction API
%% ----------------------------------------------------------------------------

anything_but( NotThis, NotThis, NotThis) -> NotThis;
anything_but( NotThis, NotThis, This) -> This;
anything_but( NotThis, This, NotThis) -> This;
anything_but(_NotThis, This, This) -> This.
%%
%% `deletions()' describe a set of removals of bindings and/or exchanges from
%% the metadata store.
%%
%% This deletion collection is used for two purposes:
%%
%% <ul>
%% <li>"<em>Processing</em>" of deletions. Processing here means that the
%% exchanges and bindings are passed into the {@link rabbit_exchange}
%% callbacks. When an exchange is deleted the `rabbit_exchange:delete/1'
%% callback is invoked and when the exchange is not deleted but some bindings
%% are deleted the `rabbit_exchange:remove_bindings/2' is invoked.</li>
%% <li><em>Notification</em> of metadata deletion. Like other internal
%% notifications, {@link rabbit_binding:notify_deletions()} uses {@link
%% rabbit_event} to notify any interested consumers of a resource deletion.
%% An example consumer of {@link rabbit_event} is the `rabbitmq_event_exchange'
%% plugin which publishes these notifications as messages.</li>
%% </ul>
%%
%% The point of collecting deletions into this opaque type is to be able to
%% collect all bindings deleted for a given exchange into a list. This allows
%% us to invoke the `rabbit_exchange:remove_bindings/2' callback with all
%% deleted bindings at once rather than passing each deleted binding
%% individually.

-spec new_deletions() -> deletions().

new_deletions() -> dict:new().

-spec add_deletion
(rabbit_exchange:name(),
{'undefined' | rabbit_types:exchange(),
'deleted' | 'not_deleted',
bindings()},
deletions()) ->
deletions().

add_deletion(XName, Entry, Deletions) ->
dict:update(XName, fun (Entry1) -> merge_entry(Entry1, Entry) end,
Entry, Deletions).
new_deletions() -> #{}.

-spec add_deletion(XName, X, XDeleted, Bindings, Deletions) -> Deletions1
when
XName :: rabbit_exchange:name(),
X :: rabbit_types:exchange(),
XDeleted :: deleted | not_deleted,
Bindings :: bindings(),
Deletions :: deletions(),
Deletions1 :: deletions().

add_deletion(XName, X, WasDeleted, Bindings, Deletions)
when (WasDeleted =:= deleted orelse WasDeleted =:= not_deleted) andalso
is_list(Bindings) andalso is_map(Deletions) ->
WasDeleted1 = case WasDeleted of
deleted -> true;
not_deleted -> false
end,
Bindings1 = sets:from_list(Bindings, [{version, 2}]),
Deletion = #deletion{exchange = X,
deleted = WasDeleted1,
bindings = Bindings1},
maps:update_with(
XName,
fun(Deletion1) ->
merge_deletion(Deletion1, Deletion)
end, Deletion, Deletions).

-spec combine_deletions(deletions(), deletions()) -> deletions().

combine_deletions(Deletions1, Deletions2) ->
dict:merge(fun (_XName, Entry1, Entry2) -> merge_entry(Entry1, Entry2) end,
Deletions1, Deletions2).

merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
{anything_but(undefined, X1, X2),
anything_but(not_deleted, Deleted1, Deleted2),
Bindings1 ++ Bindings2};
merge_entry({X1, Deleted1, Bindings1, none}, {X2, Deleted2, Bindings2, none}) ->
{anything_but(undefined, X1, X2),
anything_but(not_deleted, Deleted1, Deleted2),
Bindings1 ++ Bindings2, none}.

notify_deletions({error, not_found}, _) ->
ok;
notify_deletions(Deletions, ActingUser) ->
dict:fold(fun (XName, {_X, deleted, Bs, _}, ok) ->
notify_exchange_deletion(XName, ActingUser),
notify_bindings_deletion(Bs, ActingUser);
(_XName, {_X, not_deleted, Bs, _}, ok) ->
notify_bindings_deletion(Bs, ActingUser);
(XName, {_X, deleted, Bs}, ok) ->
combine_deletions(Deletions1, Deletions2)
when is_map(Deletions1) andalso is_map(Deletions2) ->
maps:merge_with(
fun (_XName, Deletion1, Deletion2) ->
merge_deletion(Deletion1, Deletion2)
end, Deletions1, Deletions2).

merge_deletion(
#deletion{deleted = Deleted1, bindings = Bindings1},
#deletion{exchange = X2, deleted = Deleted2, bindings = Bindings2}) ->
%% Assume that X2 is more up to date than X1.
X = X2,
Deleted = Deleted1 orelse Deleted2,
Bindings = sets:union(Bindings1, Bindings2),
#deletion{exchange = X,
deleted = Deleted,
bindings = Bindings}.

-spec notify_deletions(Deletions, ActingUser) -> ok when
Deletions :: rabbit_binding:deletions(),
ActingUser :: rabbit_types:username().

notify_deletions(Deletions, ActingUser) when is_map(Deletions) ->
maps:foreach(
fun (XName, #deletion{deleted = XDeleted, bindings = Bindings}) ->
case XDeleted of
true ->
notify_exchange_deletion(XName, ActingUser),
notify_bindings_deletion(Bs, ActingUser);
(_XName, {_X, not_deleted, Bs}, ok) ->
notify_bindings_deletion(Bs, ActingUser)
end, ok, Deletions).
notify_bindings_deletion(Bindings, ActingUser);
false ->
notify_bindings_deletion(Bindings, ActingUser)
end
end, Deletions).

notify_exchange_deletion(XName, ActingUser) ->
ok = rabbit_event:notify(
exchange_deleted,
[{name, XName},
{user_who_performed_action, ActingUser}]).

notify_bindings_deletion(Bs, ActingUser) ->
[rabbit_event:notify(binding_deleted,
info(B) ++ [{user_who_performed_action, ActingUser}])
|| B <- Bs],
ok.
notify_bindings_deletion(Bindings, ActingUser) ->
sets:fold(
fun(Binding, ok) ->
rabbit_event:notify(
binding_deleted,
info(Binding) ++ [{user_who_performed_action, ActingUser}]),
ok
end, ok, Bindings).

-spec process_deletions(deletions()) -> deletions().
-spec process_deletions(deletions()) -> ok.
process_deletions(Deletions) ->
dict:map(fun (_XName, {X, deleted, Bindings}) ->
Bs = lists:flatten(Bindings),
Serial = rabbit_exchange:serial(X),
rabbit_exchange:callback(X, delete, Serial, [X]),
{X, deleted, Bs, none};
(_XName, {X, not_deleted, Bindings}) ->
Bs = lists:flatten(Bindings),
Serial = rabbit_exchange:serial(X),
rabbit_exchange:callback(X, remove_bindings, Serial, [X, Bs]),
{X, not_deleted, Bs, none}
end, Deletions).

binding_checks(Binding, InnerFun) ->
fun(Src, Dst) ->
case rabbit_exchange:validate_binding(Src, Binding) of
ok ->
%% this argument is used to check queue exclusivity;
%% in general, we want to fail on that in preference to
%% anything else
InnerFun(Src, Dst);
Err ->
Err
end
maps:foreach(
fun (_XName, #deletion{exchange = X,
deleted = XDeleted,
bindings = Bindings}) ->
Serial = rabbit_exchange:serial(X),
case XDeleted of
true ->
rabbit_exchange:callback(X, delete, Serial, [X]);
false ->
Bindings1 = sets:to_list(Bindings),
rabbit_exchange:callback(
X, remove_bindings, Serial, [X, Bindings1])
end
end, Deletions).

-spec fetch_deletion(XName, Deletions) -> Ret when
XName :: rabbit_exchange:name(),
Deletions :: deletions(),
Ret :: {X, WasDeleted, Bindings},
X :: rabbit_types:exchange(),
WasDeleted :: deleted | not_deleted,
Bindings :: bindings().
%% @doc Fetches the deletions for the given exchange name.
%%
%% This function is only intended for use in tests.
%%
%% @private

fetch_deletion(XName, Deletions) ->
case maps:find(XName, Deletions) of
{ok, #deletion{exchange = X,
deleted = Deleted,
bindings = Bindings}} ->
WasDeleted = case Deleted of
true ->
deleted;
false ->
not_deleted
end,
Bindings1 = sets:to_list(Bindings),
{X, WasDeleted, Bindings1};
error ->
error
end.
50 changes: 30 additions & 20 deletions deps/rabbit/src/rabbit_db_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,10 @@ delete_in_mnesia(Src, Dst, B) ->
should_index_table(Src), fun delete/3),
Deletions0 = maybe_auto_delete_exchange_in_mnesia(
B#binding.source, [B], rabbit_binding:new_deletions(), false),
fun() -> {ok, rabbit_binding:process_deletions(Deletions0)} end.
fun() ->
ok = rabbit_binding:process_deletions(Deletions0),
{ok, Deletions0}
end.

absent_errs_only_in_mnesia(Names) ->
Errs = [E || Name <- Names,
Expand Down Expand Up @@ -352,7 +355,8 @@ delete_in_khepri(#binding{source = SrcName,
{error, _} = Err ->
Err;
Deletions ->
{ok, rabbit_binding:process_deletions(Deletions)}
ok = rabbit_binding:process_deletions(Deletions),
{ok, Deletions}
end.

exists_in_khepri(Path, Binding) ->
Expand All @@ -379,15 +383,18 @@ delete_in_khepri(Binding) ->
end.

maybe_auto_delete_exchange_in_khepri(XName, Bindings, Deletions, OnlyDurable) ->
{Entry, Deletions1} =
case rabbit_db_exchange:maybe_auto_delete_in_khepri(XName, OnlyDurable) of
{not_deleted, X} ->
{{X, not_deleted, Bindings}, Deletions};
{deleted, X, Deletions2} ->
{{X, deleted, Bindings},
rabbit_binding:combine_deletions(Deletions, Deletions2)}
end,
rabbit_binding:add_deletion(XName, Entry, Deletions1).
case rabbit_db_exchange:maybe_auto_delete_in_khepri(XName, OnlyDurable) of
{not_deleted, undefined} ->
Deletions;
{not_deleted, X} ->
rabbit_binding:add_deletion(
XName, X, not_deleted, Bindings, Deletions);
{deleted, X, Deletions1} ->
Deletions2 = rabbit_binding:combine_deletions(
Deletions, Deletions1),
rabbit_binding:add_deletion(
XName, X, deleted, Bindings, Deletions2)
end.

%% -------------------------------------------------------------------
%% get_all().
Expand Down Expand Up @@ -1152,15 +1159,18 @@ sync_index_route(_, _, _) ->
OnlyDurable :: boolean(),
Ret :: rabbit_binding:deletions().
maybe_auto_delete_exchange_in_mnesia(XName, Bindings, Deletions, OnlyDurable) ->
{Entry, Deletions1} =
case rabbit_db_exchange:maybe_auto_delete_in_mnesia(XName, OnlyDurable) of
{not_deleted, X} ->
{{X, not_deleted, Bindings}, Deletions};
{deleted, X, Deletions2} ->
{{X, deleted, Bindings},
rabbit_binding:combine_deletions(Deletions, Deletions2)}
end,
rabbit_binding:add_deletion(XName, Entry, Deletions1).
case rabbit_db_exchange:maybe_auto_delete_in_mnesia(XName, OnlyDurable) of
{not_deleted, undefined} ->
Deletions;
{not_deleted, X} ->
rabbit_binding:add_deletion(
XName, X, not_deleted, Bindings, Deletions);
{deleted, X, Deletions1} ->
Deletions2 = rabbit_binding:combine_deletions(
Deletions, Deletions1),
rabbit_binding:add_deletion(
XName, X, deleted, Bindings, Deletions2)
end.

%% Instead of locking entire table on remove operations we can lock the
%% affected resource only.
Expand Down
Loading
Loading