Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also .

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also .
base repository: rabbitmq/rabbitmq-server
base: v3.8.21
head repository: rabbitmq/rabbitmq-server
compare: v3.8.22
Commits on Aug 11, 2021
Add tests for the regression introduced in #3041  (backport #3295)
Commits on Aug 19, 2021
The table might not exist yet (or is already gone) between the time
rabbit_nodes:all_running() runs and returns a specific node, and
mnesia:dirty_match_object() is called for that node's table.

This seems to happen frequently in CI.

(cherry picked from commit 0b1942b)
rabbit_{connection,channel}_tracking: Fix race condition in list() (backport #3324)
Commits on Aug 24, 2021
…with-metadata

Correctly import virtual host metadata

(cherry picked from commit a809e55)
specifically tags

(cherry picked from commit 6539ad7)
Currently they can only be specified at creation time

(cherry picked from commit a5373d7)
Commits on Aug 26, 2021
Commits on Aug 28, 2021
…import-of-vhost-with-metadata""

This reverts commit 5e561da.
@@ -795,7 +795,7 @@ suites = [
rabbitmq_integration_suite(
PACKAGE,
name = "unit_amqp091_server_properties_SUITE",
size = "small",
size = "medium",
),
rabbitmq_integration_suite(
PACKAGE,
@@ -873,7 +873,7 @@ suites = [
rabbitmq_integration_suite(
PACKAGE,
name = "unit_log_management_SUITE",
size = "small",
size = "medium",
),
rabbitmq_suite(
name = "unit_operator_policy_SUITE",
@@ -179,7 +179,18 @@ list() ->
lists:foldl(
fun (Node, Acc) ->
Tab = tracked_channel_table_name_for(Node),
Acc ++ mnesia:dirty_match_object(Tab, #tracked_channel{_ = '_'})
try
Acc ++
mnesia:dirty_match_object(Tab, #tracked_channel{_ = '_'})
catch
exit:{aborted, {no_exists, [Tab, _]}} ->
%% The table might not exist yet (or is already gone)
%% between the time rabbit_nodes:all_running() runs and
%% returns a specific node, and
%% mnesia:dirty_match_object() is called for that node's
%% table.
Acc
end
end, [], rabbit_nodes:all_running()).

-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_channel()].
@@ -351,7 +351,18 @@ list() ->
lists:foldl(
fun (Node, Acc) ->
Tab = tracked_connection_table_name_for(Node),
Acc ++ mnesia:dirty_match_object(Tab, #tracked_connection{_ = '_'})
try
Acc ++
mnesia:dirty_match_object(Tab, #tracked_connection{_ = '_'})
catch
exit:{aborted, {no_exists, [Tab, _]}} ->
%% The table might not exist yet (or is already gone)
%% between the time rabbit_nodes:all_running() runs and
%% returns a specific node, and
%% mnesia:dirty_match_object() is called for that node's
%% table.
Acc
end
end, [], rabbit_nodes:all_running()).

-spec count() -> non_neg_integer().
@@ -360,6 +371,9 @@ count() ->
lists:foldl(
fun (Node, Acc) ->
Tab = tracked_connection_table_name_for(Node),
%% mnesia:table_info() returns 0 if the table doesn't exist. We
%% don't need the same kind of protection as the list() function
%% above.
Acc + mnesia:table_info(Tab, size)
end, 0, rabbit_nodes:all_running()).

@@ -461,11 +461,13 @@ add_policy(VHost, Param, Username) ->
-spec add_vhost(map(), rabbit_types:username()) -> ok.

add_vhost(VHost, ActingUser) ->
VHostName = maps:get(name, VHost, undefined),
VHostTrace = maps:get(tracing, VHost, undefined),
VHostDefinition = maps:get(definition, VHost, undefined),
VHostTags = maps:get(tags, VHost, undefined),
rabbit_vhost:put_vhost(VHostName, VHostDefinition, VHostTags, VHostTrace, ActingUser).
Name = maps:get(name, VHost, undefined),
IsTracingEnabled = maps:get(tracing, VHost, undefined),
Metadata = rabbit_data_coercion:atomize_keys(maps:get(metadata, VHost, #{})),
Description = maps:get(description, VHost, maps:get(description, Metadata, <<"">>)),
Tags = maps:get(tags, VHost, maps:get(tags, Metadata, [])),

rabbit_vhost:put_vhost(Name, Description, Tags, IsTracingEnabled, ActingUser).

add_permission(Permission, ActingUser) ->
rabbit_auth_backend_internal:set_permissions(maps:get(user, Permission, undefined),
@@ -13,7 +13,9 @@
-export([recover/0, recover/1, read_config/1]).
-export([add/2, add/4, delete/2, exists/1, with/2, with_user_and_vhost/3, assert/1, update/2,
set_limits/2, vhost_cluster_state/1, is_running_on_all_nodes/1, await_running_on_all_nodes/2,
list/0, count/0, list_names/0, all/0, parse_tags/1]).
list/0, count/0, list_names/0, all/0]).
-export([parse_tags/1, update_metadata/2, tag_with/2, untag_from/2, update_tags/2, update_tags/3]).
-export([lookup/1]).
-export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]).
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0, config_file_path/1, ensure_config_file/1]).
-export([delete_storage/1]).
@@ -24,6 +26,9 @@
%% API
%%

-type vhost_tag() :: atom() | string() | binary().
-export_type([vhost_tag/0]).

recover() ->
%% Clear out remnants of old incarnation, in case we restarted
%% faster than other nodes handled DOWN messages from us.
@@ -378,45 +383,98 @@ all() -> mnesia:dirty_match_object(rabbit_vhost, vhost:pattern_match_all()).
count() ->
length(list()).

-spec with(vhost:name(), rabbit_misc:thunk(A)) -> A.
-spec lookup(vhost:name()) -> vhost:vhost() | rabbit_types:ok_or_error(any()).
lookup(VHostName) ->
case rabbit_misc:dirty_read({rabbit_vhost, VHostName}) of
{error, not_found} -> {error, {no_such_vhost, VHostName}};
{ok, Record} -> Record
end.

with(VHost, Thunk) ->
-spec with(vhost:name(), rabbit_misc:thunk(A)) -> A.
with(VHostName, Thunk) ->
fun () ->
case mnesia:read({rabbit_vhost, VHost}) of
[] ->
mnesia:abort({no_such_vhost, VHost});
[_V] ->
Thunk()
end
case mnesia:read({rabbit_vhost, VHostName}) of
[] -> mnesia:abort({no_such_vhost, VHostName});
[_V] -> Thunk()
end
end.

-spec with_user_and_vhost
(rabbit_types:username(), vhost:name(), rabbit_misc:thunk(A)) -> A.

with_user_and_vhost(Username, VHost, Thunk) ->
rabbit_misc:with_user(Username, with(VHost, Thunk)).
-spec with_user_and_vhost(rabbit_types:username(), vhost:name(), rabbit_misc:thunk(A)) -> A.
with_user_and_vhost(Username, VHostName, Thunk) ->
rabbit_misc:with_user(Username, with(VHostName, Thunk)).

%% Like with/2 but outside an Mnesia tx

-spec assert(vhost:name()) -> 'ok'.

assert(VHost) -> case exists(VHost) of
true -> ok;
false -> throw({error, {no_such_vhost, VHost}})
end.
assert(VHostName) ->
case exists(VHostName) of
true -> ok;
false -> throw({error, {no_such_vhost, VHostName}})
end.

-spec update(vhost:name(), fun((vhost:vhost()) -> vhost:vhost())) -> vhost:vhost().

update(VHost, Fun) ->
case mnesia:read({rabbit_vhost, VHost}) of
update(VHostName, Fun) ->
case mnesia:read({rabbit_vhost, VHostName}) of
[] ->
mnesia:abort({no_such_vhost, VHost});
mnesia:abort({no_such_vhost, VHostName});
[V] ->
V1 = Fun(V),
ok = mnesia:write(rabbit_vhost, V1, write),
V1
end.

-spec update_metadata(vhost:name(), fun((map())-> map())) -> vhost:vhost() | rabbit_types:ok_or_error(any()).
update_metadata(VHostName, Fun) ->
update(VHostName, fun(Record) ->
Meta = Fun(vhost:get_metadata(Record)),
vhost:set_metadata(Record, Meta)
end).

-spec update_tags(vhost:name(), [vhost_tag()], rabbit_types:username()) -> vhost:vhost() | rabbit_types:ok_or_error(any()).
update_tags(VHostName, Tags, ActingUser) ->
ConvertedTags = [rabbit_data_coercion:to_atom(I) || I <- Tags],
try
R = rabbit_misc:execute_mnesia_transaction(fun() ->
update_tags(VHostName, ConvertedTags)
end),
rabbit_log:info("Successfully set tags for virtual host '~s' to ~p", [VHostName, ConvertedTags]),
rabbit_event:notify(vhost_tags_set, [{name, VHostName},
{tags, ConvertedTags},
{user_who_performed_action, ActingUser}]),
R
catch
throw:{error, {no_such_vhost, _}} = Error ->
rabbit_log:warning("Failed to set tags for virtual host '~s': the virtual host does not exist", [VHostName]),
throw(Error);
throw:Error ->
rabbit_log:warning("Failed to set tags for virtual host '~s': ~p", [VHostName, Error]),
throw(Error);
exit:Error ->
rabbit_log:warning("Failed to set tags for virtual host '~s': ~p", [VHostName, Error]),
exit(Error)
end.

-spec update_tags(vhost:name(), [vhost_tag()]) -> vhost:vhost() | rabbit_types:ok_or_error(any()).
update_tags(VHostName, Tags) ->
ConvertedTags = [rabbit_data_coercion:to_atom(I) || I <- Tags],
update(VHostName, fun(Record) ->
Meta0 = vhost:get_metadata(Record),
Meta = maps:update(tags, ConvertedTags, Meta0),
vhost:set_metadata(Record, Meta)
end).

-spec tag_with(vhost:name(), [atom()]) -> vhost:vhost() | rabbit_types:ok_or_error(any()).
tag_with(VHostName, Tags) when is_list(Tags) ->
update_metadata(VHostName, fun(#{tags := Tags0} = Meta) ->
maps:update(tags, lists:usort(Tags0 ++ Tags), Meta)
end).

-spec untag_from(vhost:name(), [atom()]) -> vhost:vhost() | rabbit_types:ok_or_error(any()).
untag_from(VHostName, Tags) when is_list(Tags) ->
update_metadata(VHostName, fun(#{tags := Tags0} = Meta) ->
maps:update(tags, lists:usort(Tags0 -- Tags), Meta)
end).

set_limits(VHost, undefined) ->
vhost:set_limits(VHost, []);
set_limits(VHost, Limits) ->
@@ -25,7 +25,8 @@
get_metadata/1,
get_description/1,
get_tags/1,
set_limits/2
set_limits/2,
set_metadata/2
]).

-define(record_version, vhost_v2).
@@ -170,3 +171,12 @@ set_limits(VHost, Value) ->
_ ->
vhost_v1:set_limits(VHost, Value)
end.

set_metadata(VHost, Value) ->
case record_version_to_use() of
?record_version ->
VHost#vhost{metadata = Value};
_ ->
%% the field is not available, so this is a no-op
VHost
end.
@@ -28,6 +28,8 @@
variable_queue_fold_msg_on_disk,
variable_queue_dropfetchwhile,
variable_queue_dropwhile_varying_ram_duration,
variable_queue_dropwhile_restart,
variable_queue_dropwhile_sync_restart,
variable_queue_fetchwhile_varying_ram_duration,
variable_queue_ack_limiting,
variable_queue_purge,
@@ -983,6 +985,87 @@ variable_queue_dropfetchwhile2(VQ0, _QName) ->

VQ5.

variable_queue_dropwhile_restart(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, variable_queue_dropwhile_restart1, [Config]).

variable_queue_dropwhile_restart1(Config) ->
with_fresh_variable_queue(
fun variable_queue_dropwhile_restart2/2,
?config(variable_queue_type, Config)).

variable_queue_dropwhile_restart2(VQ0, QName) ->
Count = 10000,

%% add messages with sequential expiry
VQ1 = variable_queue_publish(
true, 1, Count,
fun (N, Props) -> Props#message_properties{expiry = N} end,
fun erlang:term_to_binary/1, VQ0),

%% drop the first 5 messages
{#message_properties{expiry = 6}, VQ2} =
rabbit_variable_queue:dropwhile(
fun (#message_properties {expiry = Expiry}) -> Expiry =< 5 end, VQ1),

_VQ3 = rabbit_variable_queue:terminate(shutdown, VQ2),
Terms = variable_queue_read_terms(QName),
VQ4 = variable_queue_init(test_amqqueue(QName, true), Terms),

%% fetch 5
VQ5 = lists:foldl(fun (_, VQN) ->
{{_, _, _}, VQM} =
rabbit_variable_queue:fetch(false, VQN),
VQM
end, VQ4, lists:seq(6, Count)),

%% should be empty now
true = rabbit_variable_queue:is_empty(VQ5),

VQ5.

variable_queue_dropwhile_sync_restart(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, variable_queue_dropwhile_sync_restart1, [Config]).

variable_queue_dropwhile_sync_restart1(Config) ->
with_fresh_variable_queue(
fun variable_queue_dropwhile_sync_restart2/2,
?config(variable_queue_type, Config)).

variable_queue_dropwhile_sync_restart2(VQ0, QName) ->
Count = 10000,

%% add messages with sequential expiry
VQ1 = variable_queue_publish(
true, 1, Count,
fun (N, Props) -> Props#message_properties{expiry = N} end,
fun erlang:term_to_binary/1, VQ0),

%% drop the first 5 messages
{#message_properties{expiry = 6}, VQ2} =
rabbit_variable_queue:dropwhile(
fun (#message_properties {expiry = Expiry}) -> Expiry =< 5 end, VQ1),

%% Queue index sync.
VQ2b = rabbit_variable_queue:handle_pre_hibernate(VQ2),

_VQ3 = rabbit_variable_queue:terminate(shutdown, VQ2b),
Terms = variable_queue_read_terms(QName),
VQ4 = variable_queue_init(test_amqqueue(QName, true), Terms),

%% fetch 5
VQ5 = lists:foldl(fun (_, VQN) ->
{{_, _, _}, VQM} =
rabbit_variable_queue:fetch(false, VQN),
VQM
end, VQ4, lists:seq(6, Count)),

%% should be empty now
true = rabbit_variable_queue:is_empty(VQ5),

VQ5.

variable_queue_dropwhile_varying_ram_duration(Config) ->
passed = rabbit_ct_broker_helpers:rpc(Config, 0,
?MODULE, variable_queue_dropwhile_varying_ram_duration1, [Config]).
@@ -1370,9 +1453,19 @@ variable_queue_init(Q, Recover) ->
rabbit_variable_queue:init(
Q, case Recover of
true -> non_clean_shutdown;
false -> new
false -> new;
Terms -> Terms
end, fun nop/2, fun nop/2, fun nop/1, fun nop/1).

variable_queue_read_terms(QName) ->
#resource { kind = queue,
virtual_host = VHost,
name = Name } = QName,
<<Num:128>> = erlang:md5(<<"queue", VHost/binary, Name/binary>>),
DirName = rabbit_misc:format("~.36B", [Num]),
{ok, Terms} = rabbit_recovery_terms:read(VHost, DirName),
Terms.

publish_and_confirm(Q, Payload, Count) ->
Seqs = lists:seq(1, Count),
[begin

No commit comments for this range