From 4aa68ca4ddfbaff2a63bb3d5b7fdc424b034f434 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Mon, 30 Sep 2024 18:08:01 -0400 Subject: [PATCH] Represent `rabbit_binding:deletions()` with a map instead of dict The `dict:dict()` typing of `rabbit_binding` appears to be a historical artifact. `dict` has been superseded by `maps`. Switching to a map makes deletions easier to inspect manually and faster. Though if deletions grow so large that the map representation is important, manipulation of the deletions is unlikely to be expensive compared to any other operations that produced them, so performance is probably irrelevant. This commit refactors the bottom section of the `rabbit_binding` module to switch to a map, switch the `deletions()` type to an opaque, eliminating a TODO created when using Erlang/OTP 17.1, and the deletion value to a record. We eliminate some historical artifacts and "cruft": * Deletions taking multiple forms needlessly, specifically the shape `{X, deleted | not_deleted, Bindings, none}` no longer being handled. `process_deletions/2` was responsible for creating this shape. Instead we now use a record to clearly define the fields. * Clauses to catch `{error, not_found}` are unnecessary after minor refactors of the callers. Removing them makes the type specs cleaner. * `rabbit_binding:process_deletions/1` has no need to update or change the deletions. This function uses `maps:foreach/2` instead and returns `ok` instead of mapped deletions. * Remove `undefined` from the typespec of deletions. This value is no longer possible with a refactor to `maybe_auto_delete_exchange_in_*` functions for Mnesia and Khepri. The value was nonsensical since you cannot delete bindings for an exchange that does not exist. --- deps/rabbit/src/rabbit_amqqueue.erl | 16 +- deps/rabbit/src/rabbit_binding.erl | 235 +++++++++++++------ deps/rabbit/src/rabbit_db_binding.erl | 50 ++-- deps/rabbit/src/rabbit_db_exchange.erl | 8 +- deps/rabbit/src/rabbit_exchange.erl | 27 +-- deps/rabbit/test/rabbit_db_binding_SUITE.erl | 8 +- deps/rabbit/test/rabbit_db_queue_SUITE.erl | 4 +- 7 files changed, 217 insertions(+), 131 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 5f73f81c500a..2ef86b0203da 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -1818,8 +1818,8 @@ internal_delete(Queue, ActingUser, Reason) -> {error, timeout} = Err -> Err; Deletions -> - _ = rabbit_binding:process_deletions(Deletions), - rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER), + ok = rabbit_binding:process_deletions(Deletions), + ok = rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER), rabbit_core_metrics:queue_deleted(QueueName), ok = rabbit_event:notify(queue_deleted, [{name, QueueName}, @@ -1942,14 +1942,14 @@ filter_transient_queues_to_delete(Node) -> end. notify_queue_binding_deletions(QueueDeletions) when is_list(QueueDeletions) -> - Deletions = rabbit_binding:process_deletions( - lists:foldl(fun rabbit_binding:combine_deletions/2, - rabbit_binding:new_deletions(), - QueueDeletions)), + Deletions = lists:foldl( + fun rabbit_binding:combine_deletions/2, + rabbit_binding:new_deletions(), QueueDeletions), + ok = rabbit_binding:process_deletions(Deletions), rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER); notify_queue_binding_deletions(QueueDeletions) -> - Deletions = rabbit_binding:process_deletions(QueueDeletions), - rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER). + ok = rabbit_binding:process_deletions(QueueDeletions), + rabbit_binding:notify_deletions(QueueDeletions, ?INTERNAL_USER). notify_transient_queues_deleted(QueueDeletions) -> lists:foreach( diff --git a/deps/rabbit/src/rabbit_binding.erl b/deps/rabbit/src/rabbit_binding.erl index cf7f79b51e6a..bde550e2d0a6 100644 --- a/deps/rabbit/src/rabbit_binding.erl +++ b/deps/rabbit/src/rabbit_binding.erl @@ -13,7 +13,7 @@ -export([list/1, list_for_source/1, list_for_destination/1, list_for_source_and_destination/2, list_for_source_and_destination/3, list_explicit/0]). --export([new_deletions/0, combine_deletions/2, add_deletion/3, +-export([new_deletions/0, combine_deletions/2, add_deletion/5, process_deletions/1, notify_deletions/2, group_bindings_fold/3]). -export([info_keys/0, info/1, info/2, info_all/1, info_all/2, info_all/4]). @@ -22,6 +22,9 @@ -export([reverse_route/1, index_route/1]). -export([binding_type/2]). +%% For testing only +-export([fetch_deletion/2]). + -define(DEFAULT_EXCHANGE(VHostPath), #resource{virtual_host = VHostPath, kind = exchange, name = <<>>}). @@ -50,9 +53,12 @@ rabbit_types:ok_or_error(rabbit_types:amqp_error())). -type bindings() :: [rabbit_types:binding()]. -%% TODO this should really be opaque but that seems to confuse 17.1's -%% dialyzer into objecting to everything that uses it. --type deletions() :: dict:dict(). +-record(deletion, {exchange :: rabbit_types:exchange(), + %% Whether the exchange was deleted. + deleted :: boolean(), + bindings :: sets:set(rabbit_types:binding())}). + +-opaque deletions() :: #{XName :: rabbit_exchange:name() => #deletion{}}. %%---------------------------------------------------------------------------- @@ -159,6 +165,19 @@ binding_type0(false, true) -> binding_type0(_, _) -> transient. +binding_checks(Binding, InnerFun) -> + fun(Src, Dst) -> + case rabbit_exchange:validate_binding(Src, Binding) of + ok -> + %% this argument is used to check queue exclusivity; + %% in general, we want to fail on that in preference to + %% anything else + InnerFun(Src, Dst); + Err -> + Err + end + end. + -spec remove(rabbit_types:binding(), rabbit_types:username()) -> bind_res(). remove(Binding, ActingUser) -> remove(Binding, fun (_Src, _Dst) -> ok end, ActingUser). @@ -360,57 +379,96 @@ index_route(#route{binding = #binding{source = Source, %% ---------------------------------------------------------------------------- %% Binding / exchange deletion abstraction API %% ---------------------------------------------------------------------------- - -anything_but( NotThis, NotThis, NotThis) -> NotThis; -anything_but( NotThis, NotThis, This) -> This; -anything_but( NotThis, This, NotThis) -> This; -anything_but(_NotThis, This, This) -> This. +%% +%% `deletions()' describe a set of removals of bindings and/or exchanges from +%% the metadata store. +%% +%% This deletion collection is used for two purposes: +%% +%% +%% +%% The point of collecting deletions into this opaque type is to be able to +%% collect all bindings deleted for a given exchange into a list. This allows +%% us to invoke the `rabbit_exchange:remove_bindings/2' callback with all +%% deleted bindings at once rather than passing each deleted binding +%% individually. -spec new_deletions() -> deletions(). -new_deletions() -> dict:new(). - --spec add_deletion - (rabbit_exchange:name(), - {'undefined' | rabbit_types:exchange(), - 'deleted' | 'not_deleted', - bindings()}, - deletions()) -> - deletions(). - -add_deletion(XName, Entry, Deletions) -> - dict:update(XName, fun (Entry1) -> merge_entry(Entry1, Entry) end, - Entry, Deletions). +new_deletions() -> #{}. + +-spec add_deletion(XName, X, XDeleted, Bindings, Deletions) -> Deletions1 + when + XName :: rabbit_exchange:name(), + X :: rabbit_types:exchange(), + XDeleted :: deleted | not_deleted, + Bindings :: bindings(), + Deletions :: deletions(), + Deletions1 :: deletions(). + +add_deletion(XName, X, WasDeleted, Bindings, Deletions) + when (WasDeleted =:= deleted orelse WasDeleted =:= not_deleted) andalso + is_list(Bindings) andalso is_map(Deletions) -> + WasDeleted1 = case WasDeleted of + deleted -> true; + not_deleted -> false + end, + Bindings1 = sets:from_list(Bindings, [{version, 2}]), + Deletion = #deletion{exchange = X, + deleted = WasDeleted1, + bindings = Bindings1}, + maps:update_with( + XName, + fun(Deletion1) -> + merge_deletion(Deletion1, Deletion) + end, Deletion, Deletions). -spec combine_deletions(deletions(), deletions()) -> deletions(). -combine_deletions(Deletions1, Deletions2) -> - dict:merge(fun (_XName, Entry1, Entry2) -> merge_entry(Entry1, Entry2) end, - Deletions1, Deletions2). - -merge_entry({X1, Deleted1, Bindings1}, {X2, Deleted2, Bindings2}) -> - {anything_but(undefined, X1, X2), - anything_but(not_deleted, Deleted1, Deleted2), - Bindings1 ++ Bindings2}; -merge_entry({X1, Deleted1, Bindings1, none}, {X2, Deleted2, Bindings2, none}) -> - {anything_but(undefined, X1, X2), - anything_but(not_deleted, Deleted1, Deleted2), - Bindings1 ++ Bindings2, none}. - -notify_deletions({error, not_found}, _) -> - ok; -notify_deletions(Deletions, ActingUser) -> - dict:fold(fun (XName, {_X, deleted, Bs, _}, ok) -> - notify_exchange_deletion(XName, ActingUser), - notify_bindings_deletion(Bs, ActingUser); - (_XName, {_X, not_deleted, Bs, _}, ok) -> - notify_bindings_deletion(Bs, ActingUser); - (XName, {_X, deleted, Bs}, ok) -> +combine_deletions(Deletions1, Deletions2) + when is_map(Deletions1) andalso is_map(Deletions2) -> + maps:merge_with( + fun (_XName, Deletion1, Deletion2) -> + merge_deletion(Deletion1, Deletion2) + end, Deletions1, Deletions2). + +merge_deletion( + #deletion{deleted = Deleted1, bindings = Bindings1}, + #deletion{exchange = X2, deleted = Deleted2, bindings = Bindings2}) -> + %% Assume that X2 is more up to date than X1. + X = X2, + Deleted = Deleted1 orelse Deleted2, + Bindings = sets:union(Bindings1, Bindings2), + #deletion{exchange = X, + deleted = Deleted, + bindings = Bindings}. + +-spec notify_deletions(Deletions, ActingUser) -> ok when + Deletions :: rabbit_binding:deletions(), + ActingUser :: rabbit_types:username(). + +notify_deletions(Deletions, ActingUser) when is_map(Deletions) -> + maps:foreach( + fun (XName, #deletion{deleted = XDeleted, bindings = Bindings}) -> + case XDeleted of + true -> notify_exchange_deletion(XName, ActingUser), - notify_bindings_deletion(Bs, ActingUser); - (_XName, {_X, not_deleted, Bs}, ok) -> - notify_bindings_deletion(Bs, ActingUser) - end, ok, Deletions). + notify_bindings_deletion(Bindings, ActingUser); + false -> + notify_bindings_deletion(Bindings, ActingUser) + end + end, Deletions). notify_exchange_deletion(XName, ActingUser) -> ok = rabbit_event:notify( @@ -418,35 +476,58 @@ notify_exchange_deletion(XName, ActingUser) -> [{name, XName}, {user_who_performed_action, ActingUser}]). -notify_bindings_deletion(Bs, ActingUser) -> - [rabbit_event:notify(binding_deleted, - info(B) ++ [{user_who_performed_action, ActingUser}]) - || B <- Bs], - ok. +notify_bindings_deletion(Bindings, ActingUser) -> + sets:fold( + fun(Binding, ok) -> + rabbit_event:notify( + binding_deleted, + info(Binding) ++ [{user_who_performed_action, ActingUser}]), + ok + end, ok, Bindings). --spec process_deletions(deletions()) -> deletions(). +-spec process_deletions(deletions()) -> ok. process_deletions(Deletions) -> - dict:map(fun (_XName, {X, deleted, Bindings}) -> - Bs = lists:flatten(Bindings), - Serial = rabbit_exchange:serial(X), - rabbit_exchange:callback(X, delete, Serial, [X]), - {X, deleted, Bs, none}; - (_XName, {X, not_deleted, Bindings}) -> - Bs = lists:flatten(Bindings), - Serial = rabbit_exchange:serial(X), - rabbit_exchange:callback(X, remove_bindings, Serial, [X, Bs]), - {X, not_deleted, Bs, none} - end, Deletions). - -binding_checks(Binding, InnerFun) -> - fun(Src, Dst) -> - case rabbit_exchange:validate_binding(Src, Binding) of - ok -> - %% this argument is used to check queue exclusivity; - %% in general, we want to fail on that in preference to - %% anything else - InnerFun(Src, Dst); - Err -> - Err - end + maps:foreach( + fun (_XName, #deletion{exchange = X, + deleted = XDeleted, + bindings = Bindings}) -> + Serial = rabbit_exchange:serial(X), + case XDeleted of + true -> + rabbit_exchange:callback(X, delete, Serial, [X]); + false -> + Bindings1 = sets:to_list(Bindings), + rabbit_exchange:callback( + X, remove_bindings, Serial, [X, Bindings1]) + end + end, Deletions). + +-spec fetch_deletion(XName, Deletions) -> Ret when + XName :: rabbit_exchange:name(), + Deletions :: deletions(), + Ret :: {X, WasDeleted, Bindings}, + X :: rabbit_types:exchange(), + WasDeleted :: deleted | not_deleted, + Bindings :: bindings(). +%% @doc Fetches the deletions for the given exchange name. +%% +%% This function is only intended for use in tests. +%% +%% @private + +fetch_deletion(XName, Deletions) -> + case maps:find(XName, Deletions) of + {ok, #deletion{exchange = X, + deleted = Deleted, + bindings = Bindings}} -> + WasDeleted = case Deleted of + true -> + deleted; + false -> + not_deleted + end, + Bindings1 = sets:to_list(Bindings), + {X, WasDeleted, Bindings1}; + error -> + error end. diff --git a/deps/rabbit/src/rabbit_db_binding.erl b/deps/rabbit/src/rabbit_db_binding.erl index 942b3a648110..9bb02277ca52 100644 --- a/deps/rabbit/src/rabbit_db_binding.erl +++ b/deps/rabbit/src/rabbit_db_binding.erl @@ -302,7 +302,10 @@ delete_in_mnesia(Src, Dst, B) -> should_index_table(Src), fun delete/3), Deletions0 = maybe_auto_delete_exchange_in_mnesia( B#binding.source, [B], rabbit_binding:new_deletions(), false), - fun() -> {ok, rabbit_binding:process_deletions(Deletions0)} end. + fun() -> + ok = rabbit_binding:process_deletions(Deletions0), + {ok, Deletions0} + end. absent_errs_only_in_mnesia(Names) -> Errs = [E || Name <- Names, @@ -352,7 +355,8 @@ delete_in_khepri(#binding{source = SrcName, {error, _} = Err -> Err; Deletions -> - {ok, rabbit_binding:process_deletions(Deletions)} + ok = rabbit_binding:process_deletions(Deletions), + {ok, Deletions} end. exists_in_khepri(Path, Binding) -> @@ -379,15 +383,18 @@ delete_in_khepri(Binding) -> end. maybe_auto_delete_exchange_in_khepri(XName, Bindings, Deletions, OnlyDurable) -> - {Entry, Deletions1} = - case rabbit_db_exchange:maybe_auto_delete_in_khepri(XName, OnlyDurable) of - {not_deleted, X} -> - {{X, not_deleted, Bindings}, Deletions}; - {deleted, X, Deletions2} -> - {{X, deleted, Bindings}, - rabbit_binding:combine_deletions(Deletions, Deletions2)} - end, - rabbit_binding:add_deletion(XName, Entry, Deletions1). + case rabbit_db_exchange:maybe_auto_delete_in_khepri(XName, OnlyDurable) of + {not_deleted, undefined} -> + Deletions; + {not_deleted, X} -> + rabbit_binding:add_deletion( + XName, X, not_deleted, Bindings, Deletions); + {deleted, X, Deletions1} -> + Deletions2 = rabbit_binding:combine_deletions( + Deletions, Deletions1), + rabbit_binding:add_deletion( + XName, X, deleted, Bindings, Deletions2) + end. %% ------------------------------------------------------------------- %% get_all(). @@ -1152,15 +1159,18 @@ sync_index_route(_, _, _) -> OnlyDurable :: boolean(), Ret :: rabbit_binding:deletions(). maybe_auto_delete_exchange_in_mnesia(XName, Bindings, Deletions, OnlyDurable) -> - {Entry, Deletions1} = - case rabbit_db_exchange:maybe_auto_delete_in_mnesia(XName, OnlyDurable) of - {not_deleted, X} -> - {{X, not_deleted, Bindings}, Deletions}; - {deleted, X, Deletions2} -> - {{X, deleted, Bindings}, - rabbit_binding:combine_deletions(Deletions, Deletions2)} - end, - rabbit_binding:add_deletion(XName, Entry, Deletions1). + case rabbit_db_exchange:maybe_auto_delete_in_mnesia(XName, OnlyDurable) of + {not_deleted, undefined} -> + Deletions; + {not_deleted, X} -> + rabbit_binding:add_deletion( + XName, X, not_deleted, Bindings, Deletions); + {deleted, X, Deletions1} -> + Deletions2 = rabbit_binding:combine_deletions( + Deletions, Deletions1), + rabbit_binding:add_deletion( + XName, X, deleted, Bindings, Deletions2) + end. %% Instead of locking entire table on remove operations we can lock the %% affected resource only. diff --git a/deps/rabbit/src/rabbit_db_exchange.erl b/deps/rabbit/src/rabbit_db_exchange.erl index f8c37a22428f..ef6b9f3c61aa 100644 --- a/deps/rabbit/src/rabbit_db_exchange.erl +++ b/deps/rabbit/src/rabbit_db_exchange.erl @@ -573,7 +573,7 @@ next_serial_in_khepri_tx(#exchange{name = XName}) -> IfUnused :: boolean(), Exchange :: rabbit_types:exchange(), Binding :: rabbit_types:binding(), - Deletions :: dict:dict(), + Deletions :: rabbit_binding:deletions(), Ret :: {deleted, Exchange, [Binding], Deletions} | {error, not_found} | {error, in_use} | @@ -624,7 +624,7 @@ unconditional_delete_in_mnesia(X, OnlyDurable) -> RemoveBindingsForSource :: boolean(), Exchange :: rabbit_types:exchange(), Binding :: rabbit_types:binding(), - Deletions :: dict:dict(), + Deletions :: rabbit_binding:deletions(), Ret :: {error, not_found} | {error, in_use} | {deleted, Exchange, [Binding], Deletions}. delete_in_mnesia(X = #exchange{name = XName}, OnlyDurable, RemoveBindingsForSource) -> ok = mnesia:delete({?MNESIA_TABLE, XName}), @@ -695,7 +695,7 @@ delete_all_in_mnesia_tx(VHostName) -> {deleted, #exchange{name = XName}, Bindings, XDeletions} = unconditional_delete_in_mnesia( X, false), XDeletions1 = rabbit_binding:add_deletion( - XName, {X, deleted, Bindings}, XDeletions), + XName, X, deleted, Bindings, XDeletions), rabbit_binding:combine_deletions(Acc, XDeletions1) end, rabbit_binding:new_deletions(), Xs), {ok, Deletions}. @@ -716,7 +716,7 @@ delete_all_in_khepri_tx(VHostName) -> rabbit_db_binding:delete_all_for_exchange_in_khepri( X, false, true), Deletions1 = rabbit_binding:add_deletion( - XName, {X, deleted, Bindings}, XDeletions), + XName, X, deleted, Bindings, XDeletions), rabbit_binding:combine_deletions(Deletions, Deletions1) end, rabbit_binding:new_deletions(), NodeProps), {ok, Deletions}. diff --git a/deps/rabbit/src/rabbit_exchange.erl b/deps/rabbit/src/rabbit_exchange.erl index b4037f9a8078..391b6b8934e0 100644 --- a/deps/rabbit/src/rabbit_exchange.erl +++ b/deps/rabbit/src/rabbit_exchange.erl @@ -470,13 +470,15 @@ delete(XName, IfUnused, Username) -> _ = rabbit_runtime_parameters:set(XName#resource.virtual_host, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, XName#resource.name, true, Username), - Deletions = process_deletions(rabbit_db_exchange:delete(XName, IfUnused)), - case Deletions of - {error, _} -> - Deletions; - _ -> - rabbit_binding:notify_deletions(Deletions, Username), - ok + case rabbit_db_exchange:delete(XName, IfUnused) of + {deleted, #exchange{name = XName} = X, Bs, Deletions} -> + Deletions1 = rabbit_binding:add_deletion( + XName, X, deleted, Bs, Deletions), + ok = rabbit_binding:process_deletions(Deletions1), + ok = rabbit_binding:notify_deletions(Deletions1, Username), + ok; + {error, _} = Err -> + Err end after rabbit_runtime_parameters:clear(XName#resource.virtual_host, @@ -491,17 +493,10 @@ delete(XName, IfUnused, Username) -> delete_all(VHostName, ActingUser) -> {ok, Deletions} = rabbit_db_exchange:delete_all(VHostName), - Deletions1 = rabbit_binding:process_deletions(Deletions), - rabbit_binding:notify_deletions(Deletions1, ActingUser), + ok = rabbit_binding:process_deletions(Deletions), + ok = rabbit_binding:notify_deletions(Deletions, ActingUser), ok. -process_deletions({error, _} = E) -> - E; -process_deletions({deleted, #exchange{name = XName} = X, Bs, Deletions}) -> - rabbit_binding:process_deletions( - rabbit_binding:add_deletion( - XName, {X, deleted, Bs}, Deletions)). - -spec ensure_deleted(ExchangeName, IfUnused, Username) -> Ret when ExchangeName :: name(), IfUnused :: boolean(), diff --git a/deps/rabbit/test/rabbit_db_binding_SUITE.erl b/deps/rabbit/test/rabbit_db_binding_SUITE.erl index 9055e4ff1ddb..07eb0aea09d0 100644 --- a/deps/rabbit/test/rabbit_db_binding_SUITE.erl +++ b/deps/rabbit/test/rabbit_db_binding_SUITE.erl @@ -131,8 +131,8 @@ delete1(_Config) -> Ret = rabbit_db_binding:delete(Binding, fun(_, _) -> ok end), ?assertMatch({ok, _}, Ret), {ok, Deletions} = Ret, - ?assertMatch({#exchange{}, not_deleted, [#binding{}], none}, - dict:fetch(XName1, Deletions)), + ?assertMatch({#exchange{}, not_deleted, [#binding{}]}, + rabbit_binding:fetch_deletion(XName1, Deletions)), ?assertEqual(false, rabbit_db_binding:exists(Binding)), passed. @@ -152,8 +152,8 @@ auto_delete1(_Config) -> Ret = rabbit_db_binding:delete(Binding, fun(_, _) -> ok end), ?assertMatch({ok, _}, Ret), {ok, Deletions} = Ret, - ?assertMatch({#exchange{}, deleted, [#binding{}], none}, - dict:fetch(XName1, Deletions)), + ?assertMatch({#exchange{}, not_deleted, [#binding{}]}, + rabbit_binding:fetch_deletion(XName1, Deletions)), ?assertEqual(false, rabbit_db_binding:exists(Binding)), passed. diff --git a/deps/rabbit/test/rabbit_db_queue_SUITE.erl b/deps/rabbit/test/rabbit_db_queue_SUITE.erl index f66e8fd236c9..06ff1a4889d2 100644 --- a/deps/rabbit/test/rabbit_db_queue_SUITE.erl +++ b/deps/rabbit/test/rabbit_db_queue_SUITE.erl @@ -292,8 +292,8 @@ delete1(_Config) -> ?assertEqual({ok, Q}, rabbit_db_queue:get(QName)), %% TODO Can we handle the deletions outside of rabbit_db_queue? Probably not because %% they should be done in a single transaction, but what a horrid API to have! - Dict = rabbit_db_queue:delete(QName, normal), - ?assertEqual(0, dict:size(Dict)), + Deletions = rabbit_db_queue:delete(QName, normal), + ?assertEqual(rabbit_binding:new_deletions(), Deletions), ?assertEqual(ok, rabbit_db_queue:delete(QName, normal)), ?assertEqual({error, not_found}, rabbit_db_queue:get(QName)), passed.