From 54c9b85836199597fb3b633fdeb2d725ab9d45ad Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Thu, 10 May 2018 13:43:39 -0500 Subject: [PATCH 1/7] Make policy validation aware of the max-priority argument References #1590. [#157380396] --- src/rabbit_policies.erl | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index f48189b210cf..0faad7259d6f 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -41,6 +41,7 @@ register() -> {policy_validator, <<"expires">>}, {policy_validator, <<"max-length">>}, {policy_validator, <<"max-length-bytes">>}, + {policy_validator, <<"max-priority">>}, {policy_validator, <<"queue-mode">>}, {policy_validator, <<"overflow">>}, {operator_policy_validator, <<"expires">>}, @@ -100,6 +101,12 @@ validate_policy0(<<"max-length-bytes">>, Value) validate_policy0(<<"max-length-bytes">>, Value) -> {error, "~p is not a valid maximum length in bytes", [Value]}; +validate_policy0(<<"max-priority">>, Value) + when is_integer(Value), Value >= 0, Value =< 255 -> + ok; +validate_policy0(<<"max-priority">>, Value) -> + {error, "~p is not a valid max priority (must be an integer in the 1-255 range)", [Value]}; + validate_policy0(<<"queue-mode">>, <<"default">>) -> ok; validate_policy0(<<"queue-mode">>, <<"lazy">>) -> @@ -117,4 +124,3 @@ merge_policy_value(<<"message-ttl">>, Val, OpVal) -> min(Val, OpVal); merge_policy_value(<<"max-length">>, Val, OpVal) -> min(Val, OpVal); merge_policy_value(<<"max-length-bytes">>, Val, OpVal) -> min(Val, OpVal); merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal). - From 08636f98cef491c62bf67d6b865333d65f0d3b9f Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Fri, 11 May 2018 00:05:07 -0500 Subject: [PATCH 2/7] Reject max-priority arguments >= 256 This is the value we advertise in the docs and it should be enforced to avoid process explosion e.g. when an overflow value is provided. Part of #1590. [#157380396] --- src/rabbit_amqqueue.erl | 9 ++++++++- src/rabbit_priority_queue.erl | 13 ++++++++----- test/priority_queue_SUITE.erl | 29 +++++++++++++++++++++++++++-- 3 files changed, 43 insertions(+), 8 deletions(-) diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ac79d56358b1..eb00729baa6a 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -575,7 +575,7 @@ declare_args() -> {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, {<<"x-max-length">>, fun check_non_neg_int_arg/2}, {<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2}, - {<<"x-max-priority">>, fun check_non_neg_int_arg/2}, + {<<"x-max-priority">>, fun check_max_priority_arg/2}, {<<"x-overflow">>, fun check_overflow/2}, {<<"x-queue-mode">>, fun check_queue_mode/2}]. @@ -611,6 +611,13 @@ check_message_ttl_arg({Type, Val}, Args) -> Error -> Error end. +check_max_priority_arg({Type, Val}, Args) -> + case check_non_neg_int_arg({Type, Val}, Args) of + ok when Val =< 255 -> ok; + ok -> {error, {max_value_exceeded, Val}}; + Error -> Error + end. + %% Note that the validity of x-dead-letter-exchange is already verified %% by rabbit_channel's queue.declare handler. check_dlxname_arg({longstr, _}, _) -> ok; diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index 5786bed6ba38..b1eb83dddc7a 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -128,11 +128,14 @@ collapse_recovery(QNames, DupNames, Recovery) -> priorities(#amqqueue{arguments = Args}) -> Ints = [long, short, signedint, byte, unsignedbyte, unsignedshort, unsignedint], case rabbit_misc:table_lookup(Args, <<"x-max-priority">>) of - {Type, Max} -> case lists:member(Type, Ints) of - false -> none; - true -> lists:reverse(lists:seq(0, Max)) - end; - _ -> none + {Type, RequestedMax} -> + case lists:member(Type, Ints) of + false -> none; + true -> + Max = min(RequestedMax, ?MAX_SUPPORTED_PRIORITY), + lists:reverse(lists:seq(0, Max)) + end; + _ -> none end. %%---------------------------------------------------------------------------- diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl index a1ae66dbbb2d..cfa98b94c7b2 100644 --- a/test/priority_queue_SUITE.erl +++ b/test/priority_queue_SUITE.erl @@ -17,6 +17,7 @@ -module(priority_queue_SUITE). -include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). -compile(export_all). @@ -46,7 +47,9 @@ groups() -> simple_order, straight_through, invoke, - gen_server2_stats + gen_server2_stats, + negative_max_priorities, + max_priorities_above_hard_limit ]}, {cluster_size_3, [], [ mirror_queue_auto_ack, @@ -192,6 +195,28 @@ straight_through(Config) -> rabbit_ct_client_helpers:close_connection(Conn), passed. +max_priorities_above_hard_limit(Config) -> + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Q = <<"max_priorities_above_hard_limit">>, + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + %% Note that lower values (e.g. 300) will cause overflow the byte type here. + %% However, values >= 256 would still be rejected when used by + %% other clients + declare(Ch, Q, 3000)), + rabbit_ct_client_helpers:close_connection(Conn), + passed. + +negative_max_priorities(Config) -> + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Q = <<"negative_max_priorities">>, + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(Ch, Q, -10)), + rabbit_ct_client_helpers:close_connection(Conn), + passed. + + invoke(Config) -> %% Synthetic test to check the invoke callback, as the bug tested here %% is only triggered with a race condition. @@ -669,7 +694,7 @@ get_ok(Ch, Q, Ack, PBin) -> {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = PBin2}} = amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = Ack =:= no_ack}), - PBin = PBin2, + ?assertEqual(PBin, PBin2), maybe_ack(Ch, Ack, DTag). get_payload(Ch, Q, Ack, Ps) -> From f5aa1fbe043395806d9b9ed8780892924431466c Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Sun, 13 May 2018 01:32:39 -0500 Subject: [PATCH 3/7] Take policy-configured max-priority into account Part of #1590. --- src/rabbit_amqqueue_process.erl | 4 ++++ src/rabbit_priority_queue.erl | 18 ++++++++++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a3c8f9951935..746679168f0e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -382,6 +382,7 @@ process_args_policy(State = #q{q = Q, {<<"message-ttl">>, fun res_min/2, fun init_ttl/2}, {<<"max-length">>, fun res_min/2, fun init_max_length/2}, {<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2}, + {<<"max-priority">>, fun res_arg/2, fun init_max_priority/2}, {<<"overflow">>, fun res_arg/2, fun init_overflow/2}, {<<"queue-mode">>, fun res_arg/2, fun init_queue_mode/2}], drop_expired_msgs( @@ -426,6 +427,9 @@ init_max_bytes(MaxBytes, State) -> {_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}), State1. +init_max_priority(_MaxPriority, State) -> + State. + init_overflow(undefined, State) -> State; init_overflow(Overflow, State) -> diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index b1eb83dddc7a..481fd9a390e6 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -28,6 +28,8 @@ {requires, pre_boot}, {enables, kernel_ready}]}). +-import(rabbit_misc, [pget/2]). + -export([enable/0]). -export([start/2, stop/1]). @@ -43,6 +45,8 @@ info/2, invoke/3, is_duplicate/2, set_queue_mode/2, zip_msgs_and_acks/4, handle_info/2]). +-export([max_priority/1, priorities/1]). + -record(state, {bq, bqss, max_priority}). -record(passthrough, {bq, bqs}). @@ -125,9 +129,19 @@ collapse_recovery(QNames, DupNames, Recovery) -> end, dict:new(), lists:zip(DupNames, Recovery)), [dict:fetch(Name, NameToTerms) || Name <- QNames]. -priorities(#amqqueue{arguments = Args}) -> - Ints = [long, short, signedint, byte, unsignedbyte, unsignedshort, unsignedint], +max_priority(Q = #amqqueue{arguments = Args}) -> case rabbit_misc:table_lookup(Args, <<"x-max-priority">>) of + {Type, RequestedMax} -> {Type, RequestedMax}; + undefined -> + case rabbit_policy:effective_definition(Q) of + undefined -> undefined; + Proplist -> {unsignedbyte, pget(<<"max-priority">>, Proplist)} + end + end. + +priorities(Q) -> + Ints = [long, short, signedint, byte, unsignedbyte, unsignedshort, unsignedint], + case max_priority(Q) of {Type, RequestedMax} -> case lists:member(Type, Ints) of false -> none; From 44500061d465a8552a07af5a3a519d2263f94f8a Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Mon, 14 May 2018 12:37:39 -0300 Subject: [PATCH 4/7] Use the constant across the board Part of #1590. --- src/rabbit_policies.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index 0faad7259d6f..3ed6e33279ae 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -102,10 +102,10 @@ validate_policy0(<<"max-length-bytes">>, Value) -> {error, "~p is not a valid maximum length in bytes", [Value]}; validate_policy0(<<"max-priority">>, Value) - when is_integer(Value), Value >= 0, Value =< 255 -> + when is_integer(Value), Value >= 0, Value =< ?MAX_SUPPORTED_PRIORITY -> ok; validate_policy0(<<"max-priority">>, Value) -> - {error, "~p is not a valid max priority (must be an integer in the 1-255 range)", [Value]}; + {error, "~p is not a valid max priority (must be an integer in the 1-~p range)", [Value, ?MAX_SUPPORTED_PRIORITY]}; validate_policy0(<<"queue-mode">>, <<"default">>) -> ok; From 08168de6b0c3c4d7f6429f2c3d098e5080ff7b1a Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Mon, 14 May 2018 15:06:52 -0300 Subject: [PATCH 5/7] Revert "Take policy-configured max-priority into account" This reverts commit f5aa1fbe043395806d9b9ed8780892924431466c. This feature wasn't available in the original implementation for a reason: policies are dynamic and can change after a queue's been declared. However, queue priorities are (at least currently) set in stone from the moment of queue creation. This was mentioned in the docs but not explicitly enough and got overlooked. Credit for the [re-]discovery goes to @acogoluegnes :) References #1590. [#157380396] --- src/rabbit_amqqueue_process.erl | 4 ---- src/rabbit_priority_queue.erl | 18 ++---------------- 2 files changed, 2 insertions(+), 20 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 746679168f0e..a3c8f9951935 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -382,7 +382,6 @@ process_args_policy(State = #q{q = Q, {<<"message-ttl">>, fun res_min/2, fun init_ttl/2}, {<<"max-length">>, fun res_min/2, fun init_max_length/2}, {<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2}, - {<<"max-priority">>, fun res_arg/2, fun init_max_priority/2}, {<<"overflow">>, fun res_arg/2, fun init_overflow/2}, {<<"queue-mode">>, fun res_arg/2, fun init_queue_mode/2}], drop_expired_msgs( @@ -427,9 +426,6 @@ init_max_bytes(MaxBytes, State) -> {_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}), State1. -init_max_priority(_MaxPriority, State) -> - State. - init_overflow(undefined, State) -> State; init_overflow(Overflow, State) -> diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index 481fd9a390e6..b1eb83dddc7a 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -28,8 +28,6 @@ {requires, pre_boot}, {enables, kernel_ready}]}). --import(rabbit_misc, [pget/2]). - -export([enable/0]). -export([start/2, stop/1]). @@ -45,8 +43,6 @@ info/2, invoke/3, is_duplicate/2, set_queue_mode/2, zip_msgs_and_acks/4, handle_info/2]). --export([max_priority/1, priorities/1]). - -record(state, {bq, bqss, max_priority}). -record(passthrough, {bq, bqs}). @@ -129,19 +125,9 @@ collapse_recovery(QNames, DupNames, Recovery) -> end, dict:new(), lists:zip(DupNames, Recovery)), [dict:fetch(Name, NameToTerms) || Name <- QNames]. -max_priority(Q = #amqqueue{arguments = Args}) -> - case rabbit_misc:table_lookup(Args, <<"x-max-priority">>) of - {Type, RequestedMax} -> {Type, RequestedMax}; - undefined -> - case rabbit_policy:effective_definition(Q) of - undefined -> undefined; - Proplist -> {unsignedbyte, pget(<<"max-priority">>, Proplist)} - end - end. - -priorities(Q) -> +priorities(#amqqueue{arguments = Args}) -> Ints = [long, short, signedint, byte, unsignedbyte, unsignedshort, unsignedint], - case max_priority(Q) of + case rabbit_misc:table_lookup(Args, <<"x-max-priority">>) of {Type, RequestedMax} -> case lists:member(Type, Ints) of false -> none; From 9f9a414bef538f86f64b79eb46f60cd5d2c0483a Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Mon, 14 May 2018 15:44:25 -0300 Subject: [PATCH 6/7] Follow-up to 08168de6b0c3c4d7f6429f2c3d098e5080ff7b1a References #1590. [#157380396] --- src/rabbit_policies.erl | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index 3ed6e33279ae..43f59f17d591 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -41,7 +41,6 @@ register() -> {policy_validator, <<"expires">>}, {policy_validator, <<"max-length">>}, {policy_validator, <<"max-length-bytes">>}, - {policy_validator, <<"max-priority">>}, {policy_validator, <<"queue-mode">>}, {policy_validator, <<"overflow">>}, {operator_policy_validator, <<"expires">>}, @@ -101,12 +100,6 @@ validate_policy0(<<"max-length-bytes">>, Value) validate_policy0(<<"max-length-bytes">>, Value) -> {error, "~p is not a valid maximum length in bytes", [Value]}; -validate_policy0(<<"max-priority">>, Value) - when is_integer(Value), Value >= 0, Value =< ?MAX_SUPPORTED_PRIORITY -> - ok; -validate_policy0(<<"max-priority">>, Value) -> - {error, "~p is not a valid max priority (must be an integer in the 1-~p range)", [Value, ?MAX_SUPPORTED_PRIORITY]}; - validate_policy0(<<"queue-mode">>, <<"default">>) -> ok; validate_policy0(<<"queue-mode">>, <<"lazy">>) -> From 75cee2c119fe257cf70e876955c9bb201a9d2b38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 15 May 2018 15:51:20 +0200 Subject: [PATCH 7/7] Use MAX_SUPPORTED_PRIORITY constant for argument validation Part of #1590. --- src/rabbit_amqqueue.erl | 6 +- src/rabbit_ctl_usage.erl | 135 +++++++++++++++++++++++++++++++++++ src/rabbit_plugins_usage.erl | 14 ++++ 3 files changed, 152 insertions(+), 3 deletions(-) create mode 100644 src/rabbit_ctl_usage.erl create mode 100644 src/rabbit_plugins_usage.erl diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index eb00729baa6a..381f73376319 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -613,9 +613,9 @@ check_message_ttl_arg({Type, Val}, Args) -> check_max_priority_arg({Type, Val}, Args) -> case check_non_neg_int_arg({Type, Val}, Args) of - ok when Val =< 255 -> ok; - ok -> {error, {max_value_exceeded, Val}}; - Error -> Error + ok when Val =< ?MAX_SUPPORTED_PRIORITY -> ok; + ok -> {error, {max_value_exceeded, Val}}; + Error -> Error end. %% Note that the validity of x-dead-letter-exchange is already verified diff --git a/src/rabbit_ctl_usage.erl b/src/rabbit_ctl_usage.erl new file mode 100644 index 000000000000..0780705a82c1 --- /dev/null +++ b/src/rabbit_ctl_usage.erl @@ -0,0 +1,135 @@ +%% Generated, do not edit! +-module(rabbit_ctl_usage). +-export([usage/0]). +usage() -> "Usage: +rabbitmqctl [-n ] [-t ] [-q] [] + +Options: + -n node + -q + -t timeout + +Default node is \"rabbit@server\", where server is the local host. On a host +named \"server.example.com\", the node name of the RabbitMQ Erlang node will +usually be rabbit@server (unless RABBITMQ_NODENAME has been set to some +non-default value at broker startup time). The output of hostname -s is usually +the correct suffix to use after the \"@\" sign. See rabbitmq-server(1) for +details of configuring the RabbitMQ broker. + +Quiet output mode is selected with the \"-q\" flag. Informational messages are +suppressed when quiet mode is in effect. + +Operation timeout in seconds. Only applicable to \"list\" commands. Default is +\"infinity\". + +Commands: + stop [] + shutdown + stop_app + start_app + wait + reset + force_reset + rotate_logs + hipe_compile + + join_cluster [--ram] + cluster_status + change_cluster_node_type disc | ram + forget_cluster_node [--offline] + rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2 ...] + update_cluster_nodes clusternode + force_boot + sync_queue [-p ] queue + cancel_sync_queue [-p ] queue + purge_queue [-p ] queue + set_cluster_name name + + add_user + delete_user + change_password + clear_password + authenticate_user + set_user_tags ... + list_users + + add_vhost + delete_vhost + list_vhosts [ ...] + set_permissions [-p ] + clear_permissions [-p ] + list_permissions [-p ] + list_user_permissions + + set_parameter [-p ] + clear_parameter [-p ] + list_parameters [-p ] + set_global_parameter + clear_global_parameter + list_global_parameters + + set_policy [-p ] [--priority ] [--apply-to ] + + clear_policy [-p ] + list_policies [-p ] + + list_queues [-p ] [--offline|--online|--local] [ ...] + list_exchanges [-p ] [ ...] + list_bindings [-p ] [ ...] + list_connections [ ...] + list_channels [ ...] + list_consumers [-p ] + status + node_health_check + environment + report + eval + + close_connection + trace_on [-p ] + trace_off [-p ] + set_vm_memory_high_watermark + set_vm_memory_high_watermark absolute + set_disk_free_limit + set_disk_free_limit mem_relative + encode [--decode] [] [] [--list-ciphers] [--list-hashes] +[--cipher ] [--hash ] [--iterations ] + decode [] [][--cipher ] [--hash ] +[--iterations ] + list_hashes + list_ciphers + + must be a member of the list [name, tracing]. + +The list_queues, list_exchanges and list_bindings commands accept an optional +virtual host parameter for which to display results. The default value is \"/\". + + must be a member of the list [name, durable, auto_delete, +arguments, policy, pid, owner_pid, exclusive, exclusive_consumer_pid, +exclusive_consumer_tag, messages_ready, messages_unacknowledged, messages, +messages_ready_ram, messages_unacknowledged_ram, messages_ram, +messages_persistent, message_bytes, message_bytes_ready, +message_bytes_unacknowledged, message_bytes_ram, message_bytes_persistent, +head_message_timestamp, disk_reads, disk_writes, consumers, +consumer_utilisation, memory, slave_pids, synchronised_slave_pids, state]. + + must be a member of the list [name, type, durable, +auto_delete, internal, arguments, policy]. + + must be a member of the list [source_name, source_kind, +destination_name, destination_kind, routing_key, arguments]. + + must be a member of the list [pid, name, port, host, +peer_port, peer_host, ssl, ssl_protocol, ssl_key_exchange, ssl_cipher, +ssl_hash, peer_cert_subject, peer_cert_issuer, peer_cert_validity, state, +channels, protocol, auth_mechanism, user, vhost, timeout, frame_max, +channel_max, client_properties, recv_oct, recv_cnt, send_oct, send_cnt, +send_pend, connected_at]. + + must be a member of the list [pid, connection, name, number, +user, vhost, transactional, confirm, consumer_count, messages_unacknowledged, +messages_uncommitted, acks_uncommitted, messages_unconfirmed, prefetch_count, +global_prefetch_count]. + + +". diff --git a/src/rabbit_plugins_usage.erl b/src/rabbit_plugins_usage.erl new file mode 100644 index 000000000000..6d93cf58e214 --- /dev/null +++ b/src/rabbit_plugins_usage.erl @@ -0,0 +1,14 @@ +%% Generated, do not edit! +-module(rabbit_plugins_usage). +-export([usage/0]). +usage() -> "Usage: +rabbitmq-plugins [-n ] [] + +Commands: + list [-v] [-m] [-E] [-e] [] + enable [--offline] [--online] ... + disable [--offline] [--online] ... + set [--offline] [--online] ... + + +".