diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 55441f82f38..f5865d399a2 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -778,31 +778,38 @@ augment_declare_args(VHost, Durable, Exclusive, AutoDelete, Args0) -> #{default_queue_type := DefaultQueueType} when is_binary(DefaultQueueType) andalso not HasQTypeArg -> - Type = rabbit_queue_type:discover(DefaultQueueType), - IsPermitted = is_queue_args_combination_permitted( - Durable, Exclusive), - IsCompatible = rabbit_queue_type:is_compatible( - Type, Durable, Exclusive, AutoDelete), - case IsPermitted andalso IsCompatible of - true -> - %% patch up declare arguments with x-queue-type if there - %% is a vhost default set the queue is durable and not exclusive - %% and there is no queue type argument - %% present - rabbit_misc:set_table_value(Args0, - <<"x-queue-type">>, - longstr, - DefaultQueueType); - false -> - %% if the properties are incompatible with the declared - %% DQT, use the fall back type - rabbit_misc:set_table_value(Args0, - <<"x-queue-type">>, - longstr, - rabbit_queue_type:short_alias_of(rabbit_queue_type:fallback())) - end; + update_args_table_with_queue_type(DefaultQueueType, Durable, Exclusive, AutoDelete, Args0); _ -> - Args0 + case HasQTypeArg of + true -> Args0; + false -> + update_args_table_with_queue_type(rabbit_queue_type:short_alias_of(rabbit_queue_type:default()), Durable, Exclusive, AutoDelete, Args0) + end + end. + +update_args_table_with_queue_type(DefaultQueueType, Durable, Exclusive, AutoDelete, Args) -> + Type = rabbit_queue_type:discover(DefaultQueueType), + IsPermitted = is_queue_args_combination_permitted( + Durable, Exclusive), + IsCompatible = rabbit_queue_type:is_compatible( + Type, Durable, Exclusive, AutoDelete), + case IsPermitted andalso IsCompatible of + true -> + %% patch up declare arguments with x-queue-type if there + %% is a vhost default set the queue is durable and not exclusive + %% and there is no queue type argument + %% present + rabbit_misc:set_table_value(Args, + <<"x-queue-type">>, + longstr, + DefaultQueueType); + false -> + %% if the properties are incompatible with the declared + %% DQT, use the fall back type + rabbit_misc:set_table_value(Args, + <<"x-queue-type">>, + longstr, + rabbit_queue_type:short_alias_of(rabbit_queue_type:fallback())) end. -spec check_exclusive_access(amqqueue:amqqueue(), pid()) -> diff --git a/deps/rabbit/src/rabbit_queue_type.erl b/deps/rabbit/src/rabbit_queue_type.erl index b1ea69a0adf..aa25bf1d0b9 100644 --- a/deps/rabbit/src/rabbit_queue_type.erl +++ b/deps/rabbit/src/rabbit_queue_type.erl @@ -275,18 +275,27 @@ short_alias_of(<<"quorum">>) -> <<"quorum">>; short_alias_of(rabbit_quorum_queue) -> <<"quorum">>; +%% AMQP 1.0 management client +short_alias_of({utf8, <<"quorum">>}) -> + <<"quorum">>; short_alias_of(<<"rabbit_classic_queue">>) -> <<"classic">>; short_alias_of(<<"classic">>) -> <<"classic">>; short_alias_of(rabbit_classic_queue) -> <<"classic">>; +%% AMQP 1.0 management client +short_alias_of({utf8, <<"classic">>}) -> + <<"classic">>; short_alias_of(<<"rabbit_stream_queue">>) -> <<"stream">>; short_alias_of(<<"stream">>) -> <<"stream">>; short_alias_of(rabbit_stream_queue) -> <<"stream">>; +%% AMQP 1.0 management client +short_alias_of({utf8, <<"stream">>}) -> + <<"stream">>; short_alias_of(_Other) -> undefined. diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 946c750ec47..a363fbcda48 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -143,6 +143,7 @@ all_tests() -> idempotent_recover, vhost_with_quorum_queue_is_deleted, vhost_with_default_queue_type_declares_quorum_queue, + node_wide_default_queue_type_declares_quorum_queue, delete_immediately_by_resource, consume_redelivery_count, subscribe_redelivery_count, @@ -597,7 +598,7 @@ start_queue_concurrent(Config) -> quorum_cluster_size_3(Config) -> case rabbit_ct_helpers:is_mixed_versions() of true -> - {skip, "quorum_cluster_size_3 tests isn't mixed version reliable"}; + {skip, "quorum_cluster_size_3 test isn't mixed version reliable"}; false -> quorum_cluster_size_x(Config, 3, 3) end. @@ -822,6 +823,40 @@ vhost_with_default_queue_type_declares_quorum_queue(Config) -> amqp_connection:close(Conn), ok. +node_wide_default_queue_type_declares_quorum_queue(Config) -> + case rabbit_ct_helpers:is_mixed_versions() of + true -> + {skip, "node_wide_default_queue_type_declares_quorum_queue test isn't mixed version compatible"}; + false -> + node_wide_default_queue_type_declares_quorum_queue0(Config) + end. + +node_wide_default_queue_type_declares_quorum_queue0(Config) -> + Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + rpc:call(Node, application, set_env, [rabbit, default_queue_type, rabbit_quorum_queue]), + VHost = atom_to_binary(?FUNCTION_NAME, utf8), + QName = atom_to_binary(?FUNCTION_NAME, utf8), + User = ?config(rmq_username, Config), + + AddVhostArgs = [VHost, #{}, User], + ok = rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_vhost, add, + AddVhostArgs), + ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost), + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node, VHost), + {ok, Ch} = amqp_connection:open_channel(Conn), + ?assertEqual({'queue.declare_ok', QName, 0, 0}, declare(Ch, QName, [])), + assert_queue_type(Node, VHost, QName, rabbit_quorum_queue), + ?assertEqual({'queue.declare_ok', QName, 0, 0}, declare(Ch, QName, [])), + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', QName, 0, 0}, declare_passive(Ch, QName, [])), + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + declare_passive(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + amqp_connection:close(Conn), + + rpc:call(Node, application, set_env, [rabbit, default_queue_type, rabbit_classic_queue]), + ok. + restart_all_types(Config) -> %% Test the node restart with both types of queues (quorum and classic) to %% ensure there are no regressions @@ -1049,7 +1084,7 @@ shrink_all(Config) -> rebalance(Config) -> case rabbit_ct_helpers:is_mixed_versions() of true -> - {skip, "rebalance tests isn't mixed version compatible"}; + {skip, "rebalance test isn't mixed version compatible"}; false -> rebalance0(Config) end. @@ -1512,7 +1547,7 @@ leadership_takeover(Config) -> metrics_cleanup_on_leadership_takeover(Config) -> case rabbit_ct_helpers:is_mixed_versions() of true -> - {skip, "metrics_cleanup_on_leadership_takeover tests isn't mixed version compatible"}; + {skip, "metrics_cleanup_on_leadership_takeover test isn't mixed version compatible"}; false -> metrics_cleanup_on_leadership_takeover0(Config) end. diff --git a/deps/rabbitmq_ct_helpers/src/rabbit_mgmt_test_util.erl b/deps/rabbitmq_ct_helpers/src/rabbit_mgmt_test_util.erl index 81419dedf79..d43dbff4061 100644 --- a/deps/rabbitmq_ct_helpers/src/rabbit_mgmt_test_util.erl +++ b/deps/rabbitmq_ct_helpers/src/rabbit_mgmt_test_util.erl @@ -267,7 +267,10 @@ atomize_map_keys(I) -> %% @todo There wasn't a specific order before; now there is; maybe we shouldn't have one? assert_list(Exp, Act) -> - case length(Exp) == length(Act) of + %% allow actual map to include keys we do not assert on + %% but not the other way around: we may want to only assert on a subset + %% of keys + case length(Act) >= length(Exp) of true -> ok; false -> error({expected, Exp, actual, Act}) end, diff --git a/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl b/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl index 444e01b92eb..8dd889ae6bb 100644 --- a/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl +++ b/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl @@ -1077,21 +1077,24 @@ queues_test(Config) -> durable => true, auto_delete => false, exclusive => false, - arguments => #{}}, + arguments => #{'x-queue-type' => <<"classic">>} + }, #{name => <<"foo">>, vhost => <<"downvhost">>, state => <<"stopped">>, durable => true, auto_delete => false, exclusive => false, - arguments => #{}}], DownQueues), + arguments => #{'x-queue-type' => <<"classic">>} + }], DownQueues), assert_item(#{name => <<"foo">>, vhost => <<"downvhost">>, state => <<"stopped">>, durable => true, auto_delete => false, exclusive => false, - arguments => #{}}, DownQueue), + arguments => #{'x-queue-type' => <<"classic">>} + }, DownQueue), http_put(Config, "/queues/badvhost/bar", Good, ?NOT_FOUND), http_put(Config, "/queues/%2F/bar", @@ -1113,26 +1116,18 @@ queues_test(Config) -> durable => true, auto_delete => false, exclusive => false, - arguments => #{}, + arguments => #{'x-queue-type' => <<"classic">>}, storage_version => 1}, #{name => <<"foo">>, vhost => <<"/">>, durable => true, auto_delete => false, exclusive => false, - arguments => #{}, + arguments => #{'x-queue-type' => <<"classic">>}, storage_version => 1}], Queues), assert_item(#{name => <<"foo">>, - vhost => <<"/">>, - durable => true, - auto_delete => false, - exclusive => false, - arguments => #{}, + arguments => #{'x-queue-type' => <<"classic">>}, storage_version => 1}, Queue), - - http_delete(Config, "/queues/%2F/foo", {group, '2xx'}), - http_delete(Config, "/queues/%2F/baz", {group, '2xx'}), - http_delete(Config, "/queues/%2F/foo", ?NOT_FOUND), http_get(Config, "/queues/badvhost", ?NOT_FOUND), http_delete(Config, "/queues/downvhost/foo", {group, '2xx'}), @@ -2157,7 +2152,8 @@ exclusive_queue_test(Config) -> durable => false, auto_delete => false, exclusive => true, - arguments => #{}}, Queue), + arguments => #{'x-queue-type' => <<"classic">>} + }, Queue), amqp_channel:close(Ch), close_connection(Conn), @@ -2667,7 +2663,7 @@ columns_test(Config) -> http_delete(Config, Path, [{group, '2xx'}, 404]), http_put(Config, Path, [{arguments, [{<<"x-message-ttl">>, TTL}]}], {group, '2xx'}), - Item = #{arguments => #{'x-message-ttl' => TTL}, name => <<"columns.test">>}, + Item = #{arguments => #{'x-message-ttl' => TTL, 'x-queue-type' => <<"classic">>}, name => <<"columns.test">>}, timer:sleep(2000), [Item] = http_get(Config, "/queues?columns=arguments.x-message-ttl,name", ?OK), Item = http_get(Config, "/queues/vh.tests.columns_test/columns.test?columns=arguments.x-message-ttl,name", ?OK), diff --git a/deps/rabbitmq_management/test/rabbit_mgmt_only_http_SUITE.erl b/deps/rabbitmq_management/test/rabbit_mgmt_only_http_SUITE.erl index 8d4477a0025..573156e2d01 100644 --- a/deps/rabbitmq_management/test/rabbit_mgmt_only_http_SUITE.erl +++ b/deps/rabbitmq_management/test/rabbit_mgmt_only_http_SUITE.erl @@ -382,21 +382,23 @@ queues_test(Config) -> durable => true, auto_delete => false, exclusive => false, - arguments => #{}}, + arguments => #{'x-queue-type' => <<"classic">>} + }, #{name => <<"foo">>, vhost => <<"downvhost">>, state => <<"stopped">>, durable => true, auto_delete => false, exclusive => false, - arguments => #{}}], DownQueues), + arguments => #{'x-queue-type' => <<"classic">>} + }], DownQueues), assert_item(#{name => <<"foo">>, vhost => <<"downvhost">>, state => <<"stopped">>, durable => true, auto_delete => false, exclusive => false, - arguments => #{}}, DownQueue), + arguments => #{'x-queue-type' => <<"classic">>}}, DownQueue), http_put(Config, "/queues/badvhost/bar", Good, ?NOT_FOUND), http_put(Config, "/queues/%2F/bar", @@ -419,7 +421,7 @@ queues_test(Config) -> durable => true, auto_delete => false, exclusive => false, - arguments => #{}, + arguments => #{'x-queue-type' => <<"classic">>}, node => NodeBin}, #{name => <<"foo">>, vhost => <<"/">>, @@ -496,7 +498,7 @@ queues_enable_totals_test(Config) -> durable => true, auto_delete => false, exclusive => false, - arguments => #{}, + arguments => #{'x-queue-type' => <<"classic">>}, node => NodeBin, messages => 1, messages_ready => 1, @@ -920,7 +922,8 @@ exclusive_queue_test(Config) -> durable => false, auto_delete => false, exclusive => true, - arguments => #{}}, Queue), + arguments => #{'x-queue-type' => <<"classic">>} + }, Queue), amqp_channel:close(Ch), close_connection(Conn), passed. @@ -1552,7 +1555,7 @@ columns_test(Config) -> http_delete(Config, Path, [{group, '2xx'}, 404]), http_put(Config, Path, [{arguments, [{<<"x-message-ttl">>, TTL}]}], {group, '2xx'}), - Item = #{arguments => #{'x-message-ttl' => TTL}, name => <<"columns.test">>}, + Item = #{arguments => #{'x-message-ttl' => TTL, 'x-queue-type' => <<"classic">>}, name => <<"columns.test">>}, timer:sleep(2000), [Item] = http_get(Config, "/queues?columns=arguments.x-message-ttl,name", ?OK), Item = http_get(Config, "/queues/%2F/columns.test?columns=arguments.x-message-ttl,name", ?OK),