diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl
index 5f73f81c500a..2ef86b0203da 100644
--- a/deps/rabbit/src/rabbit_amqqueue.erl
+++ b/deps/rabbit/src/rabbit_amqqueue.erl
@@ -1818,8 +1818,8 @@ internal_delete(Queue, ActingUser, Reason) ->
{error, timeout} = Err ->
Err;
Deletions ->
- _ = rabbit_binding:process_deletions(Deletions),
- rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER),
+ ok = rabbit_binding:process_deletions(Deletions),
+ ok = rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER),
rabbit_core_metrics:queue_deleted(QueueName),
ok = rabbit_event:notify(queue_deleted,
[{name, QueueName},
@@ -1942,14 +1942,14 @@ filter_transient_queues_to_delete(Node) ->
end.
notify_queue_binding_deletions(QueueDeletions) when is_list(QueueDeletions) ->
- Deletions = rabbit_binding:process_deletions(
- lists:foldl(fun rabbit_binding:combine_deletions/2,
- rabbit_binding:new_deletions(),
- QueueDeletions)),
+ Deletions = lists:foldl(
+ fun rabbit_binding:combine_deletions/2,
+ rabbit_binding:new_deletions(), QueueDeletions),
+ ok = rabbit_binding:process_deletions(Deletions),
rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER);
notify_queue_binding_deletions(QueueDeletions) ->
- Deletions = rabbit_binding:process_deletions(QueueDeletions),
- rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER).
+ ok = rabbit_binding:process_deletions(QueueDeletions),
+ rabbit_binding:notify_deletions(QueueDeletions, ?INTERNAL_USER).
notify_transient_queues_deleted(QueueDeletions) ->
lists:foreach(
diff --git a/deps/rabbit/src/rabbit_binding.erl b/deps/rabbit/src/rabbit_binding.erl
index cf7f79b51e6a..bde550e2d0a6 100644
--- a/deps/rabbit/src/rabbit_binding.erl
+++ b/deps/rabbit/src/rabbit_binding.erl
@@ -13,7 +13,7 @@
-export([list/1, list_for_source/1, list_for_destination/1,
list_for_source_and_destination/2, list_for_source_and_destination/3,
list_explicit/0]).
--export([new_deletions/0, combine_deletions/2, add_deletion/3,
+-export([new_deletions/0, combine_deletions/2, add_deletion/5,
process_deletions/1, notify_deletions/2, group_bindings_fold/3]).
-export([info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4]).
@@ -22,6 +22,9 @@
-export([reverse_route/1, index_route/1]).
-export([binding_type/2]).
+%% For testing only
+-export([fetch_deletion/2]).
+
-define(DEFAULT_EXCHANGE(VHostPath), #resource{virtual_host = VHostPath,
kind = exchange,
name = <<>>}).
@@ -50,9 +53,12 @@
rabbit_types:ok_or_error(rabbit_types:amqp_error())).
-type bindings() :: [rabbit_types:binding()].
-%% TODO this should really be opaque but that seems to confuse 17.1's
-%% dialyzer into objecting to everything that uses it.
--type deletions() :: dict:dict().
+-record(deletion, {exchange :: rabbit_types:exchange(),
+ %% Whether the exchange was deleted.
+ deleted :: boolean(),
+ bindings :: sets:set(rabbit_types:binding())}).
+
+-opaque deletions() :: #{XName :: rabbit_exchange:name() => #deletion{}}.
%%----------------------------------------------------------------------------
@@ -159,6 +165,19 @@ binding_type0(false, true) ->
binding_type0(_, _) ->
transient.
+binding_checks(Binding, InnerFun) ->
+ fun(Src, Dst) ->
+ case rabbit_exchange:validate_binding(Src, Binding) of
+ ok ->
+ %% this argument is used to check queue exclusivity;
+ %% in general, we want to fail on that in preference to
+ %% anything else
+ InnerFun(Src, Dst);
+ Err ->
+ Err
+ end
+ end.
+
-spec remove(rabbit_types:binding(), rabbit_types:username()) -> bind_res().
remove(Binding, ActingUser) -> remove(Binding, fun (_Src, _Dst) -> ok end, ActingUser).
@@ -360,57 +379,96 @@ index_route(#route{binding = #binding{source = Source,
%% ----------------------------------------------------------------------------
%% Binding / exchange deletion abstraction API
%% ----------------------------------------------------------------------------
-
-anything_but( NotThis, NotThis, NotThis) -> NotThis;
-anything_but( NotThis, NotThis, This) -> This;
-anything_but( NotThis, This, NotThis) -> This;
-anything_but(_NotThis, This, This) -> This.
+%%
+%% `deletions()' describe a set of removals of bindings and/or exchanges from
+%% the metadata store.
+%%
+%% This deletion collection is used for two purposes:
+%%
+%%
+%% - "Processing" of deletions. Processing here means that the
+%% exchanges and bindings are passed into the {@link rabbit_exchange}
+%% callbacks. When an exchange is deleted the `rabbit_exchange:delete/1'
+%% callback is invoked and when the exchange is not deleted but some bindings
+%% are deleted the `rabbit_exchange:remove_bindings/2' is invoked.
+%% - Notification of metadata deletion. Like other internal
+%% notifications, {@link rabbit_binding:notify_deletions()} uses {@link
+%% rabbit_event} to notify any interested consumers of a resource deletion.
+%% An example consumer of {@link rabbit_event} is the `rabbitmq_event_exchange'
+%% plugin which publishes these notifications as messages.
+%%
+%%
+%% The point of collecting deletions into this opaque type is to be able to
+%% collect all bindings deleted for a given exchange into a list. This allows
+%% us to invoke the `rabbit_exchange:remove_bindings/2' callback with all
+%% deleted bindings at once rather than passing each deleted binding
+%% individually.
-spec new_deletions() -> deletions().
-new_deletions() -> dict:new().
-
--spec add_deletion
- (rabbit_exchange:name(),
- {'undefined' | rabbit_types:exchange(),
- 'deleted' | 'not_deleted',
- bindings()},
- deletions()) ->
- deletions().
-
-add_deletion(XName, Entry, Deletions) ->
- dict:update(XName, fun (Entry1) -> merge_entry(Entry1, Entry) end,
- Entry, Deletions).
+new_deletions() -> #{}.
+
+-spec add_deletion(XName, X, XDeleted, Bindings, Deletions) -> Deletions1
+ when
+ XName :: rabbit_exchange:name(),
+ X :: rabbit_types:exchange(),
+ XDeleted :: deleted | not_deleted,
+ Bindings :: bindings(),
+ Deletions :: deletions(),
+ Deletions1 :: deletions().
+
+add_deletion(XName, X, WasDeleted, Bindings, Deletions)
+ when (WasDeleted =:= deleted orelse WasDeleted =:= not_deleted) andalso
+ is_list(Bindings) andalso is_map(Deletions) ->
+ WasDeleted1 = case WasDeleted of
+ deleted -> true;
+ not_deleted -> false
+ end,
+ Bindings1 = sets:from_list(Bindings, [{version, 2}]),
+ Deletion = #deletion{exchange = X,
+ deleted = WasDeleted1,
+ bindings = Bindings1},
+ maps:update_with(
+ XName,
+ fun(Deletion1) ->
+ merge_deletion(Deletion1, Deletion)
+ end, Deletion, Deletions).
-spec combine_deletions(deletions(), deletions()) -> deletions().
-combine_deletions(Deletions1, Deletions2) ->
- dict:merge(fun (_XName, Entry1, Entry2) -> merge_entry(Entry1, Entry2) end,
- Deletions1, Deletions2).
-
-merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) ->
- {anything_but(undefined, X1, X2),
- anything_but(not_deleted, Deleted1, Deleted2),
- Bindings1 ++ Bindings2};
-merge_entry({X1, Deleted1, Bindings1, none}, {X2, Deleted2, Bindings2, none}) ->
- {anything_but(undefined, X1, X2),
- anything_but(not_deleted, Deleted1, Deleted2),
- Bindings1 ++ Bindings2, none}.
-
-notify_deletions({error, not_found}, _) ->
- ok;
-notify_deletions(Deletions, ActingUser) ->
- dict:fold(fun (XName, {_X, deleted, Bs, _}, ok) ->
- notify_exchange_deletion(XName, ActingUser),
- notify_bindings_deletion(Bs, ActingUser);
- (_XName, {_X, not_deleted, Bs, _}, ok) ->
- notify_bindings_deletion(Bs, ActingUser);
- (XName, {_X, deleted, Bs}, ok) ->
+combine_deletions(Deletions1, Deletions2)
+ when is_map(Deletions1) andalso is_map(Deletions2) ->
+ maps:merge_with(
+ fun (_XName, Deletion1, Deletion2) ->
+ merge_deletion(Deletion1, Deletion2)
+ end, Deletions1, Deletions2).
+
+merge_deletion(
+ #deletion{deleted = Deleted1, bindings = Bindings1},
+ #deletion{exchange = X2, deleted = Deleted2, bindings = Bindings2}) ->
+ %% Assume that X2 is more up to date than X1.
+ X = X2,
+ Deleted = Deleted1 orelse Deleted2,
+ Bindings = sets:union(Bindings1, Bindings2),
+ #deletion{exchange = X,
+ deleted = Deleted,
+ bindings = Bindings}.
+
+-spec notify_deletions(Deletions, ActingUser) -> ok when
+ Deletions :: rabbit_binding:deletions(),
+ ActingUser :: rabbit_types:username().
+
+notify_deletions(Deletions, ActingUser) when is_map(Deletions) ->
+ maps:foreach(
+ fun (XName, #deletion{deleted = XDeleted, bindings = Bindings}) ->
+ case XDeleted of
+ true ->
notify_exchange_deletion(XName, ActingUser),
- notify_bindings_deletion(Bs, ActingUser);
- (_XName, {_X, not_deleted, Bs}, ok) ->
- notify_bindings_deletion(Bs, ActingUser)
- end, ok, Deletions).
+ notify_bindings_deletion(Bindings, ActingUser);
+ false ->
+ notify_bindings_deletion(Bindings, ActingUser)
+ end
+ end, Deletions).
notify_exchange_deletion(XName, ActingUser) ->
ok = rabbit_event:notify(
@@ -418,35 +476,58 @@ notify_exchange_deletion(XName, ActingUser) ->
[{name, XName},
{user_who_performed_action, ActingUser}]).
-notify_bindings_deletion(Bs, ActingUser) ->
- [rabbit_event:notify(binding_deleted,
- info(B) ++ [{user_who_performed_action, ActingUser}])
- || B <- Bs],
- ok.
+notify_bindings_deletion(Bindings, ActingUser) ->
+ sets:fold(
+ fun(Binding, ok) ->
+ rabbit_event:notify(
+ binding_deleted,
+ info(Binding) ++ [{user_who_performed_action, ActingUser}]),
+ ok
+ end, ok, Bindings).
--spec process_deletions(deletions()) -> deletions().
+-spec process_deletions(deletions()) -> ok.
process_deletions(Deletions) ->
- dict:map(fun (_XName, {X, deleted, Bindings}) ->
- Bs = lists:flatten(Bindings),
- Serial = rabbit_exchange:serial(X),
- rabbit_exchange:callback(X, delete, Serial, [X]),
- {X, deleted, Bs, none};
- (_XName, {X, not_deleted, Bindings}) ->
- Bs = lists:flatten(Bindings),
- Serial = rabbit_exchange:serial(X),
- rabbit_exchange:callback(X, remove_bindings, Serial, [X, Bs]),
- {X, not_deleted, Bs, none}
- end, Deletions).
-
-binding_checks(Binding, InnerFun) ->
- fun(Src, Dst) ->
- case rabbit_exchange:validate_binding(Src, Binding) of
- ok ->
- %% this argument is used to check queue exclusivity;
- %% in general, we want to fail on that in preference to
- %% anything else
- InnerFun(Src, Dst);
- Err ->
- Err
- end
+ maps:foreach(
+ fun (_XName, #deletion{exchange = X,
+ deleted = XDeleted,
+ bindings = Bindings}) ->
+ Serial = rabbit_exchange:serial(X),
+ case XDeleted of
+ true ->
+ rabbit_exchange:callback(X, delete, Serial, [X]);
+ false ->
+ Bindings1 = sets:to_list(Bindings),
+ rabbit_exchange:callback(
+ X, remove_bindings, Serial, [X, Bindings1])
+ end
+ end, Deletions).
+
+-spec fetch_deletion(XName, Deletions) -> Ret when
+ XName :: rabbit_exchange:name(),
+ Deletions :: deletions(),
+ Ret :: {X, WasDeleted, Bindings},
+ X :: rabbit_types:exchange(),
+ WasDeleted :: deleted | not_deleted,
+ Bindings :: bindings().
+%% @doc Fetches the deletions for the given exchange name.
+%%
+%% This function is only intended for use in tests.
+%%
+%% @private
+
+fetch_deletion(XName, Deletions) ->
+ case maps:find(XName, Deletions) of
+ {ok, #deletion{exchange = X,
+ deleted = Deleted,
+ bindings = Bindings}} ->
+ WasDeleted = case Deleted of
+ true ->
+ deleted;
+ false ->
+ not_deleted
+ end,
+ Bindings1 = sets:to_list(Bindings),
+ {X, WasDeleted, Bindings1};
+ error ->
+ error
end.
diff --git a/deps/rabbit/src/rabbit_db_binding.erl b/deps/rabbit/src/rabbit_db_binding.erl
index 942b3a648110..9bb02277ca52 100644
--- a/deps/rabbit/src/rabbit_db_binding.erl
+++ b/deps/rabbit/src/rabbit_db_binding.erl
@@ -302,7 +302,10 @@ delete_in_mnesia(Src, Dst, B) ->
should_index_table(Src), fun delete/3),
Deletions0 = maybe_auto_delete_exchange_in_mnesia(
B#binding.source, [B], rabbit_binding:new_deletions(), false),
- fun() -> {ok, rabbit_binding:process_deletions(Deletions0)} end.
+ fun() ->
+ ok = rabbit_binding:process_deletions(Deletions0),
+ {ok, Deletions0}
+ end.
absent_errs_only_in_mnesia(Names) ->
Errs = [E || Name <- Names,
@@ -352,7 +355,8 @@ delete_in_khepri(#binding{source = SrcName,
{error, _} = Err ->
Err;
Deletions ->
- {ok, rabbit_binding:process_deletions(Deletions)}
+ ok = rabbit_binding:process_deletions(Deletions),
+ {ok, Deletions}
end.
exists_in_khepri(Path, Binding) ->
@@ -379,15 +383,18 @@ delete_in_khepri(Binding) ->
end.
maybe_auto_delete_exchange_in_khepri(XName, Bindings, Deletions, OnlyDurable) ->
- {Entry, Deletions1} =
- case rabbit_db_exchange:maybe_auto_delete_in_khepri(XName, OnlyDurable) of
- {not_deleted, X} ->
- {{X, not_deleted, Bindings}, Deletions};
- {deleted, X, Deletions2} ->
- {{X, deleted, Bindings},
- rabbit_binding:combine_deletions(Deletions, Deletions2)}
- end,
- rabbit_binding:add_deletion(XName, Entry, Deletions1).
+ case rabbit_db_exchange:maybe_auto_delete_in_khepri(XName, OnlyDurable) of
+ {not_deleted, undefined} ->
+ Deletions;
+ {not_deleted, X} ->
+ rabbit_binding:add_deletion(
+ XName, X, not_deleted, Bindings, Deletions);
+ {deleted, X, Deletions1} ->
+ Deletions2 = rabbit_binding:combine_deletions(
+ Deletions, Deletions1),
+ rabbit_binding:add_deletion(
+ XName, X, deleted, Bindings, Deletions2)
+ end.
%% -------------------------------------------------------------------
%% get_all().
@@ -1152,15 +1159,18 @@ sync_index_route(_, _, _) ->
OnlyDurable :: boolean(),
Ret :: rabbit_binding:deletions().
maybe_auto_delete_exchange_in_mnesia(XName, Bindings, Deletions, OnlyDurable) ->
- {Entry, Deletions1} =
- case rabbit_db_exchange:maybe_auto_delete_in_mnesia(XName, OnlyDurable) of
- {not_deleted, X} ->
- {{X, not_deleted, Bindings}, Deletions};
- {deleted, X, Deletions2} ->
- {{X, deleted, Bindings},
- rabbit_binding:combine_deletions(Deletions, Deletions2)}
- end,
- rabbit_binding:add_deletion(XName, Entry, Deletions1).
+ case rabbit_db_exchange:maybe_auto_delete_in_mnesia(XName, OnlyDurable) of
+ {not_deleted, undefined} ->
+ Deletions;
+ {not_deleted, X} ->
+ rabbit_binding:add_deletion(
+ XName, X, not_deleted, Bindings, Deletions);
+ {deleted, X, Deletions1} ->
+ Deletions2 = rabbit_binding:combine_deletions(
+ Deletions, Deletions1),
+ rabbit_binding:add_deletion(
+ XName, X, deleted, Bindings, Deletions2)
+ end.
%% Instead of locking entire table on remove operations we can lock the
%% affected resource only.
diff --git a/deps/rabbit/src/rabbit_db_exchange.erl b/deps/rabbit/src/rabbit_db_exchange.erl
index f8c37a22428f..ef6b9f3c61aa 100644
--- a/deps/rabbit/src/rabbit_db_exchange.erl
+++ b/deps/rabbit/src/rabbit_db_exchange.erl
@@ -573,7 +573,7 @@ next_serial_in_khepri_tx(#exchange{name = XName}) ->
IfUnused :: boolean(),
Exchange :: rabbit_types:exchange(),
Binding :: rabbit_types:binding(),
- Deletions :: dict:dict(),
+ Deletions :: rabbit_binding:deletions(),
Ret :: {deleted, Exchange, [Binding], Deletions} |
{error, not_found} |
{error, in_use} |
@@ -624,7 +624,7 @@ unconditional_delete_in_mnesia(X, OnlyDurable) ->
RemoveBindingsForSource :: boolean(),
Exchange :: rabbit_types:exchange(),
Binding :: rabbit_types:binding(),
- Deletions :: dict:dict(),
+ Deletions :: rabbit_binding:deletions(),
Ret :: {error, not_found} | {error, in_use} | {deleted, Exchange, [Binding], Deletions}.
delete_in_mnesia(X = #exchange{name = XName}, OnlyDurable, RemoveBindingsForSource) ->
ok = mnesia:delete({?MNESIA_TABLE, XName}),
@@ -695,7 +695,7 @@ delete_all_in_mnesia_tx(VHostName) ->
{deleted, #exchange{name = XName}, Bindings, XDeletions} =
unconditional_delete_in_mnesia( X, false),
XDeletions1 = rabbit_binding:add_deletion(
- XName, {X, deleted, Bindings}, XDeletions),
+ XName, X, deleted, Bindings, XDeletions),
rabbit_binding:combine_deletions(Acc, XDeletions1)
end, rabbit_binding:new_deletions(), Xs),
{ok, Deletions}.
@@ -716,7 +716,7 @@ delete_all_in_khepri_tx(VHostName) ->
rabbit_db_binding:delete_all_for_exchange_in_khepri(
X, false, true),
Deletions1 = rabbit_binding:add_deletion(
- XName, {X, deleted, Bindings}, XDeletions),
+ XName, X, deleted, Bindings, XDeletions),
rabbit_binding:combine_deletions(Deletions, Deletions1)
end, rabbit_binding:new_deletions(), NodeProps),
{ok, Deletions}.
diff --git a/deps/rabbit/src/rabbit_exchange.erl b/deps/rabbit/src/rabbit_exchange.erl
index b4037f9a8078..391b6b8934e0 100644
--- a/deps/rabbit/src/rabbit_exchange.erl
+++ b/deps/rabbit/src/rabbit_exchange.erl
@@ -470,13 +470,15 @@ delete(XName, IfUnused, Username) ->
_ = rabbit_runtime_parameters:set(XName#resource.virtual_host,
?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT,
XName#resource.name, true, Username),
- Deletions = process_deletions(rabbit_db_exchange:delete(XName, IfUnused)),
- case Deletions of
- {error, _} ->
- Deletions;
- _ ->
- rabbit_binding:notify_deletions(Deletions, Username),
- ok
+ case rabbit_db_exchange:delete(XName, IfUnused) of
+ {deleted, #exchange{name = XName} = X, Bs, Deletions} ->
+ Deletions1 = rabbit_binding:add_deletion(
+ XName, X, deleted, Bs, Deletions),
+ ok = rabbit_binding:process_deletions(Deletions1),
+ ok = rabbit_binding:notify_deletions(Deletions1, Username),
+ ok;
+ {error, _} = Err ->
+ Err
end
after
rabbit_runtime_parameters:clear(XName#resource.virtual_host,
@@ -491,17 +493,10 @@ delete(XName, IfUnused, Username) ->
delete_all(VHostName, ActingUser) ->
{ok, Deletions} = rabbit_db_exchange:delete_all(VHostName),
- Deletions1 = rabbit_binding:process_deletions(Deletions),
- rabbit_binding:notify_deletions(Deletions1, ActingUser),
+ ok = rabbit_binding:process_deletions(Deletions),
+ ok = rabbit_binding:notify_deletions(Deletions, ActingUser),
ok.
-process_deletions({error, _} = E) ->
- E;
-process_deletions({deleted, #exchange{name = XName} = X, Bs, Deletions}) ->
- rabbit_binding:process_deletions(
- rabbit_binding:add_deletion(
- XName, {X, deleted, Bs}, Deletions)).
-
-spec ensure_deleted(ExchangeName, IfUnused, Username) -> Ret when
ExchangeName :: name(),
IfUnused :: boolean(),
diff --git a/deps/rabbit/test/rabbit_db_binding_SUITE.erl b/deps/rabbit/test/rabbit_db_binding_SUITE.erl
index 9055e4ff1ddb..07eb0aea09d0 100644
--- a/deps/rabbit/test/rabbit_db_binding_SUITE.erl
+++ b/deps/rabbit/test/rabbit_db_binding_SUITE.erl
@@ -131,8 +131,8 @@ delete1(_Config) ->
Ret = rabbit_db_binding:delete(Binding, fun(_, _) -> ok end),
?assertMatch({ok, _}, Ret),
{ok, Deletions} = Ret,
- ?assertMatch({#exchange{}, not_deleted, [#binding{}], none},
- dict:fetch(XName1, Deletions)),
+ ?assertMatch({#exchange{}, not_deleted, [#binding{}]},
+ rabbit_binding:fetch_deletion(XName1, Deletions)),
?assertEqual(false, rabbit_db_binding:exists(Binding)),
passed.
@@ -152,8 +152,8 @@ auto_delete1(_Config) ->
Ret = rabbit_db_binding:delete(Binding, fun(_, _) -> ok end),
?assertMatch({ok, _}, Ret),
{ok, Deletions} = Ret,
- ?assertMatch({#exchange{}, deleted, [#binding{}], none},
- dict:fetch(XName1, Deletions)),
+ ?assertMatch({#exchange{}, not_deleted, [#binding{}]},
+ rabbit_binding:fetch_deletion(XName1, Deletions)),
?assertEqual(false, rabbit_db_binding:exists(Binding)),
passed.
diff --git a/deps/rabbit/test/rabbit_db_queue_SUITE.erl b/deps/rabbit/test/rabbit_db_queue_SUITE.erl
index f66e8fd236c9..06ff1a4889d2 100644
--- a/deps/rabbit/test/rabbit_db_queue_SUITE.erl
+++ b/deps/rabbit/test/rabbit_db_queue_SUITE.erl
@@ -292,8 +292,8 @@ delete1(_Config) ->
?assertEqual({ok, Q}, rabbit_db_queue:get(QName)),
%% TODO Can we handle the deletions outside of rabbit_db_queue? Probably not because
%% they should be done in a single transaction, but what a horrid API to have!
- Dict = rabbit_db_queue:delete(QName, normal),
- ?assertEqual(0, dict:size(Dict)),
+ Deletions = rabbit_db_queue:delete(QName, normal),
+ ?assertEqual(rabbit_binding:new_deletions(), Deletions),
?assertEqual(ok, rabbit_db_queue:delete(QName, normal)),
?assertEqual({error, not_found}, rabbit_db_queue:get(QName)),
passed.