From 6266e648c22b6e7a22e61e70774f85d4b66b786c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Fri, 19 Apr 2024 15:08:11 +0200 Subject: [PATCH 1/9] rabbitmq_ct_helpers: Fix handling of unset env. variables in exec/2. [Why] A variable can be set to `false` to explicitly unset it in the child process. This was ignored and converted to the string "false". [How] We special-case `false` and leave it as is. --- deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl b/deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl index 4763ca3c2c0b..4f8ad0fd7c22 100644 --- a/deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl +++ b/deps/rabbitmq_ct_helpers/src/rabbit_ct_helpers.erl @@ -880,7 +880,10 @@ exec([Cmd | Args], Options) when is_list(Cmd) orelse is_binary(Cmd) -> Env1 = [ begin Key1 = format_arg(Key), - Value1 = format_arg(Value), + Value1 = case Value of + false -> false; + _ -> format_arg(Value) + end, Value2 = case is_binary(Value1) of true -> binary_to_list(Value1); false -> Value1 @@ -894,8 +897,10 @@ exec([Cmd | Args], Options) when is_list(Cmd) orelse is_binary(Cmd) -> | proplists:delete(env, PortOptions1)], Log ++ "~n~nEnvironment variables:~n" ++ string:join( - [rabbit_misc:format(" ~ts=~ts", [K, string:replace(V, "~", "~~", all)]) - || {K, V} <- Env1], + [rabbit_misc:format( + " ~ts=~ts", + [K, string:replace(V, "~", "~~", all)]) + || {K, V} <- Env1, is_list(V) ], "~n") } end, From 50b490100d0ae979c10fca6a70bea4e0fa592393 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Fri, 19 Apr 2024 15:09:39 +0200 Subject: [PATCH 2/9] rabbitmq_peer_discovery_etcd: Add clustering testcases [Why] The existing testsuite tried if the communication with an etcd node would work, but didn't test an actual cluster formation. [How] The new testcases try to create a cluster using the local etcd node started by the testsuite. The first one starts one RabbitMQ node at a time. the second one starts all of them concurrently. While here, use the etcd source code added as a Git submodule in a previous commit to compile etcd locally just for the testsuite. --- deps/rabbitmq_peer_discovery_etcd/BUILD.bazel | 2 +- .../test/system_SUITE.erl | 297 ++++++++++++++++-- .../test/system_SUITE_data/.gitignore | 1 + 3 files changed, 269 insertions(+), 31 deletions(-) create mode 100644 deps/rabbitmq_peer_discovery_etcd/test/system_SUITE_data/.gitignore diff --git a/deps/rabbitmq_peer_discovery_etcd/BUILD.bazel b/deps/rabbitmq_peer_discovery_etcd/BUILD.bazel index d36795154044..eea80562a689 100644 --- a/deps/rabbitmq_peer_discovery_etcd/BUILD.bazel +++ b/deps/rabbitmq_peer_discovery_etcd/BUILD.bazel @@ -99,7 +99,7 @@ rabbitmq_integration_suite( rabbitmq_integration_suite( name = "system_SUITE", - size = "medium", + size = "large", ) rabbitmq_suite( diff --git a/deps/rabbitmq_peer_discovery_etcd/test/system_SUITE.erl b/deps/rabbitmq_peer_discovery_etcd/test/system_SUITE.erl index 36e67cfb64db..186992280cf8 100644 --- a/deps/rabbitmq_peer_discovery_etcd/test/system_SUITE.erl +++ b/deps/rabbitmq_peer_discovery_etcd/test/system_SUITE.erl @@ -9,19 +9,33 @@ -module(system_SUITE). --compile(export_all). - -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -include("rabbit_peer_discovery_etcd.hrl"). --import(rabbit_data_coercion, [to_binary/1, to_integer/1]). +-define(ETCD_GIT_REPO, "https://github.com/etcd-io/etcd.git"). +-define(ETCD_GIT_REF, "v3.5.13"). + +-export([all/0, + groups/0, + init_per_suite/1, + end_per_suite/1, + init_per_group/2, + end_per_group/2, + init_per_testcase/2, + end_per_testcase/2, + etcd_connection_sanity_check_test/1, + init_opens_a_connection_test/1, + registration_with_locking_test/1, + start_one_member_at_a_time/1, + start_members_concurrently/1]). all() -> [ - {group, v3_client} + {group, v3_client}, + {group, clustering} ]. groups() -> @@ -30,47 +44,178 @@ groups() -> etcd_connection_sanity_check_test, init_opens_a_connection_test, registration_with_locking_test - ]} + ]}, + {clustering, [], [start_one_member_at_a_time, + start_members_concurrently]} ]. init_per_suite(Config) -> rabbit_ct_helpers:log_environment(), - rabbit_ct_helpers:run_setup_steps(Config, [fun init_etcd/1]). + rabbit_ct_helpers:run_setup_steps( + Config, + [fun clone_etcd/1, + fun compile_etcd/1, + fun start_etcd/1]). end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config, [fun stop_etcd/1]). -init_etcd(Config) -> +init_per_group(clustering, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{rmq_nodes_count, 3}, + {rmq_nodes_clustered, false}]); +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, Config) -> + Config. + +init_per_testcase(Testcase, Config) + when Testcase =:= start_one_member_at_a_time orelse + Testcase =:= start_members_concurrently -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase), + ClusterSize = ?config(rmq_nodes_count, Config), + Config1 = rabbit_ct_helpers:set_config( + Config, + [{rmq_nodename_suffix, Testcase}, + {tcp_ports_base, {skip_n_nodes, + TestNumber * ClusterSize}} + ]), + Config2 = rabbit_ct_helpers:merge_app_env( + Config1, {rabbit, [{log, [{file, [{level, debug}]}]}]}), + Config3 = rabbit_ct_helpers:run_steps( + Config2, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()), + try + _ = rabbit_ct_broker_helpers:rpc_all( + Config3, rabbit_peer_discovery_backend, api_version, []), + Config3 + catch + error:{exception, undef, + [{rabbit_peer_discovery_backend, api_version, _, _} | _]} -> + Config4 = rabbit_ct_helpers:run_steps( + Config3, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config4, Testcase), + {skip, + "Some nodes use the old discover->register order; " + "the testcase would likely fail"} + end; +init_per_testcase(_Testcase, Config) -> + Config. + +end_per_testcase(Testcase, Config) + when Testcase =:= start_one_member_at_a_time orelse + Testcase =:= start_members_concurrently -> + Config1 = rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase); +end_per_testcase(_Testcase, Config) -> + Config. + +clone_etcd(Config) -> DataDir = ?config(data_dir, Config), - PrivDir = ?config(priv_dir, Config), - TcpPort = 25389, - EtcdDir = filename:join([PrivDir, "etcd"]), - InitEtcd = filename:join([DataDir, "init-etcd.sh"]), - Cmd = [InitEtcd, EtcdDir, {"~b", [TcpPort]}], + EtcdSrcdir = filename:join(DataDir, "etcd"), + Cmd = case filelib:is_dir(EtcdSrcdir) of + true -> + ct:pal( + "Checking out etcd Git reference, ref = ~s", + [?ETCD_GIT_REF]), + ["git", "-C", EtcdSrcdir, + "checkout", ?ETCD_GIT_REF]; + false -> + ct:pal( + "Cloning etcd Git repository, ref = ~s", + [?ETCD_GIT_REF]), + ["git", "clone", + "--branch", ?ETCD_GIT_REF, + ?ETCD_GIT_REPO, EtcdSrcdir] + end, case rabbit_ct_helpers:exec(Cmd) of - {ok, Stdout} -> - case re:run(Stdout, "^ETCD_PID=([0-9]+)$", [{capture, all_but_first, list}, multiline]) of - {match, [EtcdPid]} -> - ct:pal(?LOW_IMPORTANCE, "etcd PID: ~ts~netcd is listening on: ~b", [EtcdPid, TcpPort]), - rabbit_ct_helpers:set_config(Config, [{etcd_pid, EtcdPid}, - {etcd_endpoints, [rabbit_misc:format("localhost:~tp", [TcpPort])]}, - {etcd_port, TcpPort}]); - nomatch -> - ct:pal(?HI_IMPORTANCE, "init-etcd.sh output did not match what's expected: ~tp", [Stdout]) - end; - {error, Code, Reason} -> - ct:pal(?HI_IMPORTANCE, "init-etcd.sh exited with code ~tp: ~tp", [Code, Reason]), - _ = rabbit_ct_helpers:exec(["pkill", "-INT", "etcd"]), - {skip, "Failed to initialize etcd"} + {ok, _} -> + rabbit_ct_helpers:set_config(Config, {etcd_srcdir, EtcdSrcdir}); + {error, _} -> + {skip, "Failed to clone etcd"} end. +compile_etcd(Config) -> + EtcdSrcdir = ?config(etcd_srcdir, Config), + ct:pal("Compiling etcd in ~ts", [EtcdSrcdir]), + Script0 = case os:type() of + {win32, _} -> "build.bat"; + _ -> "build.sh" + end, + Script1 = filename:join(EtcdSrcdir, Script0), + Cmd = [Script1], + GOPATH = filename:join(EtcdSrcdir, "go"), + GOFLAGS = "-modcacherw", + Options = [{cd, EtcdSrcdir}, + {env, [{"BINDIR", false}, + {"GOPATH", GOPATH}, + {"GOFLAGS", GOFLAGS}]}], + case rabbit_ct_helpers:exec(Cmd, Options) of + {ok, _} -> + EtcdExe = case os:type() of + {win32, _} -> "etcd.exe"; + _ -> "etcd" + end, + EtcdBin = filename:join([EtcdSrcdir, "bin", EtcdExe]), + ?assert(filelib:is_regular(EtcdBin)), + rabbit_ct_helpers:set_config(Config, {etcd_bin, EtcdBin}); + {error, _} -> + {skip, "Failed to compile etcd"} + end. + +start_etcd(Config) -> + ct:pal("Starting etcd daemon"), + EtcdBin = ?config(etcd_bin, Config), + PrivDir = ?config(priv_dir, Config), + EtcdDataDir = filename:join(PrivDir, "data.etcd"), + EtcdName = ?MODULE_STRING, + EtcdHost = "localhost", + EtcdClientPort = 2379, + EtcdClientUrl = rabbit_misc:format( + "http://~s:~b", [EtcdHost, EtcdClientPort]), + EtcdAdvPort = 2380, + EtcdAdvUrl = rabbit_misc:format( + "http://~s:~b", [EtcdHost, EtcdAdvPort]), + Cmd = [EtcdBin, + "--data-dir", EtcdDataDir, + "--name", EtcdName, + "--initial-advertise-peer-urls", EtcdAdvUrl, + "--listen-peer-urls", EtcdAdvUrl, + "--advertise-client-urls", EtcdClientUrl, + "--listen-client-urls", EtcdClientUrl, + "--initial-cluster", EtcdName ++ "=" ++ EtcdAdvUrl, + "--initial-cluster-state", "new", + "--initial-cluster-token", "test-token", + "--log-level", "debug", "--log-outputs", "stdout"], + EtcdPid = spawn(fun() -> rabbit_ct_helpers:exec(Cmd) end), + + EtcdEndpoint = rabbit_misc:format("~s:~b", [EtcdHost, EtcdClientPort]), + rabbit_ct_helpers:set_config( + Config, + [{etcd_pid, EtcdPid}, + {etcd_endpoints, [EtcdEndpoint]}]). + stop_etcd(Config) -> - EtcdPid = ?config(etcd_pid, Config), - Cmd = ["kill", "-INT", EtcdPid], - _ = rabbit_ct_helpers:exec(Cmd), + case rabbit_ct_helpers:get_config(Config, etcd_pid) of + EtcdPid when is_pid(EtcdPid) -> + ct:pal( + "Stopping etcd daemon by killing control process ~p", + [EtcdPid]), + erlang:exit(EtcdPid, kill); + undefined -> + ok + end, Config. - %% %% Test cases %% @@ -128,6 +273,98 @@ registration_with_locking_test(Config) -> gen_statem:stop(Pid) end. +start_one_member_at_a_time(Config) -> + Config1 = configure_peer_discovery(Config), + + Nodes = rabbit_ct_broker_helpers:get_node_configs(Config1, nodename), + lists:foreach( + fun(Node) -> + ?assertEqual( + ok, + rabbit_ct_broker_helpers:start_node(Config1, Node)) + end, Nodes), + + assert_full_cluster(Config1). + +start_members_concurrently(Config) -> + Config1 = configure_peer_discovery(Config), + + Nodes = rabbit_ct_broker_helpers:get_node_configs(Config1, nodename), + Parent = self(), + Pids = lists:map( + fun(Node) -> + spawn_link( + fun() -> + receive + go -> + ?assertEqual( + ok, + rabbit_ct_broker_helpers:start_node( + Config1, Node)), + Parent ! started + end + end) + end, Nodes), + + lists:foreach(fun(Pid) -> Pid ! go end, Pids), + lists:foreach(fun(_Pid) -> receive started -> ok end end, Pids), + + assert_full_cluster(Config1). + +configure_peer_discovery(Config) -> + Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + lists:foreach( + fun(Node) -> + Members = lists:sort( + rabbit_ct_broker_helpers:cluster_members_online( + Config, Node)), + ?assertEqual([Node], Members) + end, Nodes), + + lists:foreach( + fun(Node) -> + ?assertEqual( + ok, + rabbit_ct_broker_helpers:stop_broker(Config, Node)), + ?assertEqual( + ok, + rabbit_ct_broker_helpers:reset_node(Config, Node)), + ?assertEqual( + ok, + rabbit_ct_broker_helpers:stop_node(Config, Node)) + end, Nodes), + + Endpoints = ?config(etcd_endpoints, Config), + Config1 = rabbit_ct_helpers:merge_app_env( + Config, + {rabbit, + [{cluster_formation, + [{peer_discovery_backend, rabbit_peer_discovery_etcd}, + {peer_discovery_etcd, + [{endpoints, Endpoints}, + {etcd_prefix, "rabbitmq"}, + {cluster_name, atom_to_list(?FUNCTION_NAME)}]}]}]}), + lists:foreach( + fun(Node) -> + ?assertEqual( + ok, + rabbit_ct_broker_helpers:rewrite_node_config_file( + Config1, Node)) + end, Nodes), + + Config1. + +assert_full_cluster(Config) -> + Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + ExpectedMembers = lists:sort(Nodes), + lists:foreach( + fun(Node) -> + Members = lists:sort( + rabbit_ct_broker_helpers:cluster_members_online( + Config, Node)), + ?assertEqual(ExpectedMembers, Members) + end, Nodes). + %% %% Helpers %% diff --git a/deps/rabbitmq_peer_discovery_etcd/test/system_SUITE_data/.gitignore b/deps/rabbitmq_peer_discovery_etcd/test/system_SUITE_data/.gitignore new file mode 100644 index 000000000000..e22d17a8cc03 --- /dev/null +++ b/deps/rabbitmq_peer_discovery_etcd/test/system_SUITE_data/.gitignore @@ -0,0 +1 @@ +/etcd/ From 684ec76f77083f87ab32b230fa31283a99584957 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Thu, 2 May 2024 10:38:03 +0200 Subject: [PATCH 3/9] rabbitmq_peer_discovery_consul: Separate service name and ID [Why] The `consul_svc` parameter is used as the service name and to construct the service ID. The problem with the way the service ID is constructed is that it doesn't allow to register several distinct RabbitMQ nodes in the same Consul agent. This is a problem for testsuites where we want to run several RabbitMQ nodes on the same host with a single local Consul agent. [How] The service ID has now its own parameters, `consul_svc_id`. If this one is unset, it falls back to the previous construction from the service name. This allows to remain 100% compatible with previous versions. --- .../include/rabbit_peer_discovery_consul.hrl | 5 +++++ .../rabbitmq_peer_discovery_consul.schema | 17 ++++++++++++++++- .../src/rabbit_peer_discovery_consul.erl | 19 ++++++++++++++----- 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/deps/rabbitmq_peer_discovery_consul/include/rabbit_peer_discovery_consul.hrl b/deps/rabbitmq_peer_discovery_consul/include/rabbit_peer_discovery_consul.hrl index 2f1130280c76..9a351bb7c0f7 100644 --- a/deps/rabbitmq_peer_discovery_consul/include/rabbit_peer_discovery_consul.hrl +++ b/deps/rabbitmq_peer_discovery_consul/include/rabbit_peer_discovery_consul.hrl @@ -69,6 +69,11 @@ env_variable = "CONSUL_SVC_ADDR_NODENAME", default_value = false }, + consul_svc_id => #peer_discovery_config_entry_meta{ + type = string, + env_variable = "CONSUL_SVC_ID", + default_value = "undefined" + }, consul_svc_port => #peer_discovery_config_entry_meta{ type = integer, env_variable = "CONSUL_SVC_PORT", diff --git a/deps/rabbitmq_peer_discovery_consul/priv/schema/rabbitmq_peer_discovery_consul.schema b/deps/rabbitmq_peer_discovery_consul/priv/schema/rabbitmq_peer_discovery_consul.schema index 5090c6e9748b..4e5188bb06a4 100644 --- a/deps/rabbitmq_peer_discovery_consul/priv/schema/rabbitmq_peer_discovery_consul.schema +++ b/deps/rabbitmq_peer_discovery_consul/priv/schema/rabbitmq_peer_discovery_consul.schema @@ -140,7 +140,7 @@ fun(Conf) -> end}. -%% use (Erlang) node name when compuing service address? +%% use (Erlang) node name when computing service address? {mapping, "cluster_formation.consul.svc_addr_use_nodename", "rabbit.cluster_formation.peer_discovery_consul.consul_svc_addr_nodename", [ {datatype, {enum, [true, false]}} @@ -155,6 +155,21 @@ fun(Conf) -> end}. +%% service ID + +{mapping, "cluster_formation.consul.svc_id", "rabbit.cluster_formation.peer_discovery_consul.consul_svc_id", [ + {datatype, string} +]}. + +{translation, "rabbit.cluster_formation.peer_discovery_consul.consul_svc_id", +fun(Conf) -> + case cuttlefish:conf_get("cluster_formation.consul.svc_id", Conf, undefined) of + undefined -> cuttlefish:unset(); + Value -> Value + end +end}. + + %% (optionally) append a suffix to node names retrieved from Consul {mapping, "cluster_formation.consul.domain_suffix", "rabbit.cluster_formation.peer_discovery_consul.consul_domain", [ diff --git a/deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl b/deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl index f50eb8fe7df3..ac6e091b6957 100644 --- a/deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl +++ b/deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl @@ -66,7 +66,7 @@ list_nodes() -> end, Fun2 = fun(Proplist) -> M = maps:from_list(Proplist), - Path = rabbit_peer_discovery_httpc:build_path([v1, health, service, get_config_key(consul_svc, M)]), + Path = rabbit_peer_discovery_httpc:build_path([v1, health, service, service_name()]), HttpOpts = http_options(M), case rabbit_peer_discovery_httpc:get(get_config_key(consul_scheme, M), get_config_key(consul_host, M), @@ -335,8 +335,7 @@ registration_body_add_id() -> -spec registration_body_add_name(Payload :: list()) -> list(). registration_body_add_name(Payload) -> - M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), - Name = rabbit_data_coercion:to_atom(get_config_key(consul_svc, M)), + Name = rabbit_data_coercion:to_atom(service_name()), lists:append(Payload, [{'Name', Name}]). -spec registration_body_maybe_add_address(Payload :: list()) @@ -484,14 +483,24 @@ service_address(_, false, NIC, _) -> -spec service_id() -> string(). service_id() -> M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), - service_id(get_config_key(consul_svc, M), - service_address()). + case get_config_key(consul_svc_id, M) of + "undefined" -> + service_id(get_config_key(consul_svc, M), + service_address()); + ID -> + ID + end. -spec service_id(Name :: string(), Address :: string()) -> string(). service_id(Service, "undefined") -> Service; service_id(Service, Address) -> string:join([Service, Address], ":"). +-spec service_name() -> string(). +service_name() -> + M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), + get_config_key(consul_svc, M). + -spec service_ttl(TTL :: integer()) -> string(). service_ttl(Value) -> rabbit_peer_discovery_util:as_string(Value) ++ "s". From 750497c8612d5f33fa003efff7b5ad669deab0a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Thu, 2 May 2024 10:38:49 +0200 Subject: [PATCH 4/9] rabbitmq_peer_discovery_consul: Populate Erlang node name during registration [Why] This allows other nodes to discover the actual node names, instead of deriving one from the Consul agent node name and their own node name. This permits to register several RabbitMQ nodes in the same Consul agent. This is at least handy for testsuites. [How] The Erlang node name is added to the `Meta` properties list as long as the RabbitMQ cluster name. Note that this also fixes when the cluster name is added to `Meta`: before this commit, a non-default cluster name was not added if the user-configured properties list was empty at the beginning. --- .../src/rabbit_peer_discovery_consul.erl | 58 ++++++++++--------- .../rabbitmq_peer_discovery_consul_SUITE.erl | 52 ++++++++++++----- 2 files changed, 68 insertions(+), 42 deletions(-) diff --git a/deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl b/deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl index ac6e091b6957..db3699e304e1 100644 --- a/deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl +++ b/deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl @@ -34,6 +34,9 @@ -define(CONSUL_CHECK_NOTES, "RabbitMQ Consul-based peer discovery plugin TTL check"). +-define(META_KEY_CLUSTER_NAME, <<"cluster">>). +-define(META_KEY_ERLANG_NODENAME, <<"erlang-node-name">>). + %% %% API %% @@ -260,15 +263,21 @@ extract_nodes(Data) -> extract_nodes(Data, []). -> list(). extract_nodes([], Nodes) -> Nodes; extract_nodes([H | T], Nodes) -> - Service = maps:get(<<"Service">>, H), - Value = maps:get(<<"Address">>, Service), - NodeName = case ?UTIL_MODULE:as_string(Value) of - "" -> - NodeData = maps:get(<<"Node">>, H), - Node = maps:get(<<"Node">>, NodeData), - maybe_add_domain(?UTIL_MODULE:node_name(Node)); - Address -> - ?UTIL_MODULE:node_name(Address) + Service = maps:get(<<"Service">>, H), + Meta = maps:get(<<"Meta">>, Service, #{}), + NodeName = case Meta of + #{?META_KEY_ERLANG_NODENAME := Node} -> + binary_to_atom(Node); + _ -> + Value = maps:get(<<"Address">>, Service), + case ?UTIL_MODULE:as_string(Value) of + "" -> + NodeData = maps:get(<<"Node">>, H), + Node = maps:get(<<"Node">>, NodeData), + maybe_add_domain(?UTIL_MODULE:node_name(Node)); + Address -> + ?UTIL_MODULE:node_name(Address) + end end, extract_nodes(T, lists:merge(Nodes, [NodeName])). @@ -417,24 +426,19 @@ registration_body_maybe_add_tag(Payload, Cluster, Tags) -> -spec registration_body_maybe_add_meta(Payload :: list()) -> list(). registration_body_maybe_add_meta(Payload) -> - M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), - ClusterName = get_config_key(cluster_name, M), - Meta = ?UTIL_MODULE:as_list(get_config_key(consul_svc_meta, M)), - registration_body_maybe_add_meta(Payload, ClusterName, Meta). - --spec registration_body_maybe_add_meta(Payload :: list(), - ClusterName :: string(), - Meta :: list()) -> list(). -registration_body_maybe_add_meta(Payload, "default", []) -> - Payload; -registration_body_maybe_add_meta(Payload, "default", Meta) -> - lists:append(Payload, [{<<"meta">>, Meta}]); -registration_body_maybe_add_meta(Payload, _ClusterName, []) -> - Payload; -registration_body_maybe_add_meta(Payload, ClusterName, Meta) -> - Merged = maps:to_list(maps:merge(#{<<"cluster">> => rabbit_data_coercion:to_binary(ClusterName)}, maps:from_list(Meta))), - lists:append(Payload, [{<<"meta">>, Merged}]). - + M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), + Meta0 = ?UTIL_MODULE:as_list(get_config_key(consul_svc_meta, M)), + Meta1 = maps:from_list(Meta0), + Meta2 = Meta1#{?META_KEY_ERLANG_NODENAME => atom_to_binary(node())}, + Meta3 = case get_config_key(cluster_name, M) of + "default" -> + Meta2; + ClusterName -> + ClusterName1 = rabbit_data_coercion:to_binary(ClusterName), + Meta2#{?META_KEY_CLUSTER_NAME => ClusterName1} + end, + Merged = maps:to_list(Meta3), + lists:append(Payload, [{'Meta', Merged}]). -spec validate_addr_parameters(false | true, false | true) -> false | true. validate_addr_parameters(false, true) -> diff --git a/deps/rabbitmq_peer_discovery_consul/test/rabbitmq_peer_discovery_consul_SUITE.erl b/deps/rabbitmq_peer_discovery_consul/test/rabbitmq_peer_discovery_consul_SUITE.erl index f1c5af4a79bf..6b53798e96e4 100644 --- a/deps/rabbitmq_peer_discovery_consul/test/rabbitmq_peer_discovery_consul_SUITE.erl +++ b/deps/rabbitmq_peer_discovery_consul/test/rabbitmq_peer_discovery_consul_SUITE.erl @@ -155,7 +155,9 @@ registration_body_simple_case(_Config) -> {'Check', [{'Notes', ?CONSUL_CHECK_NOTES}, {'TTL', '30s'}, - {'Status', 'passing'}]}], + {'Status', 'passing'}]}, + {'Meta', + [{<<"erlang-node-name">>, atom_to_binary(node())}]}], ?assertEqual(Expectation, rabbit_peer_discovery_consul:build_registration_body()). registration_body_svc_addr_set_via_env_var(_Config) -> @@ -167,7 +169,9 @@ registration_body_svc_addr_set_via_env_var(_Config) -> {'Check', [{'Notes', ?CONSUL_CHECK_NOTES}, {'TTL', '30s'}, - {'Status', 'passing'}]}], + {'Status', 'passing'}]}, + {'Meta', + [{<<"erlang-node-name">>, atom_to_binary(node())}]}], ?assertEqual(Expectation, rabbit_peer_discovery_consul:build_registration_body()). registration_body_svc_ttl_set_via_env_var(_Config) -> @@ -178,7 +182,9 @@ registration_body_svc_ttl_set_via_env_var(_Config) -> {'Check', [{'Notes', ?CONSUL_CHECK_NOTES}, {'TTL', '257s'}, - {'Status', 'passing'}]}], + {'Status', 'passing'}]}, + {'Meta', + [{<<"erlang-node-name">>, atom_to_binary(node())}]}], ?assertEqual(Expectation, rabbit_peer_discovery_consul:build_registration_body()). registration_body_svc_tags_set_via_env_var(_Config) -> @@ -190,7 +196,9 @@ registration_body_svc_tags_set_via_env_var(_Config) -> [{'Notes', ?CONSUL_CHECK_NOTES}, {'TTL', '30s'}, {'Status', 'passing'}]}, - {'Tags',['urlprefix-:5672 proto=tcp',mq,'mq server']}], + {'Tags',['urlprefix-:5672 proto=tcp',mq,'mq server']}, + {'Meta', + [{<<"erlang-node-name">>, atom_to_binary(node())}]}], ?assertEqual(Expectation, rabbit_peer_discovery_consul:build_registration_body()). registration_body_deregister_after_set_via_env_var(_Config) -> @@ -202,7 +210,9 @@ registration_body_deregister_after_set_via_env_var(_Config) -> [{'Notes', ?CONSUL_CHECK_NOTES}, {'TTL','30s'}, {'Status', 'passing'}, - {'DeregisterCriticalServiceAfter','520s'}]}], + {'DeregisterCriticalServiceAfter','520s'}]}, + {'Meta', + [{<<"erlang-node-name">>, atom_to_binary(node())}]}], ?assertEqual(Expectation, rabbit_peer_discovery_consul:build_registration_body()). registration_body_ttl_and_deregister_after_both_unset_via_env_var(_Config) -> @@ -210,7 +220,9 @@ registration_body_ttl_and_deregister_after_both_unset_via_env_var(_Config) -> os:putenv("CONSUL_SVC_TTL", ""), Expectation = [{'ID', 'rabbitmq'}, {'Name', rabbitmq}, - {'Port', 5672}], + {'Port', 5672}, + {'Meta', + [{<<"erlang-node-name">>, atom_to_binary(node())}]}], ?assertEqual(Expectation, rabbit_peer_discovery_consul:build_registration_body()). %% "deregister after" won't be enabled if TTL isn't set @@ -219,7 +231,9 @@ registration_body_ttl_unset_and_deregister_after_set_via_env_var(_Config) -> os:putenv("CONSUL_SVC_TTL", ""), Expectation = [{'ID', 'rabbitmq'}, {'Name', rabbitmq}, - {'Port', 5672}], + {'Port', 5672}, + {'Meta', + [{<<"erlang-node-name">>, atom_to_binary(node())}]}], ?assertEqual(Expectation, rabbit_peer_discovery_consul:build_registration_body()). service_id_all_defaults_test(_Config) -> @@ -450,7 +464,8 @@ registration_with_all_default_values_test(_Config) -> ?assertEqual("v1/agent/service/register", Path), ?assertEqual([], Args), ?assertEqual([], Headers), - Expect = <<"{\"ID\":\"rabbitmq\",\"Name\":\"rabbitmq\",\"Port\":5672,\"Check\":{\"Notes\":\"RabbitMQ Consul-based peer discovery plugin TTL check\",\"TTL\":\"30s\",\"Status\":\"passing\"}}">>, + Node = atom_to_binary(node()), + Expect = <<"{\"ID\":\"rabbitmq\",\"Name\":\"rabbitmq\",\"Port\":5672,\"Check\":{\"Notes\":\"RabbitMQ Consul-based peer discovery plugin TTL check\",\"TTL\":\"30s\",\"Status\":\"passing\"},\"Meta\":{\"erlang-node-name\":\"", Node/binary, "\"}}">>, ?assertEqual(Expect, Body), {ok, []} end), @@ -467,7 +482,8 @@ registration_with_cluster_name_test(_Config) -> ?assertEqual("v1/agent/service/register", Path), ?assertEqual([], Args), ?assertEqual([], Headers), - Expect = <<"{\"ID\":\"rabbitmq\",\"Name\":\"rabbitmq\",\"Port\":5672,\"Check\":{\"Notes\":\"RabbitMQ Consul-based peer discovery plugin TTL check\",\"TTL\":\"30s\",\"Status\":\"passing\"},\"Tags\":[\"test-rabbit\"]}">>, + Node = atom_to_binary(node()), + Expect = <<"{\"ID\":\"rabbitmq\",\"Name\":\"rabbitmq\",\"Port\":5672,\"Check\":{\"Notes\":\"RabbitMQ Consul-based peer discovery plugin TTL check\",\"TTL\":\"30s\",\"Status\":\"passing\"},\"Tags\":[\"test-rabbit\"],\"Meta\":{\"cluster\":\"test-rabbit\",\"erlang-node-name\":\"", Node/binary, "\"}}">>, ?assertEqual(Expect, Body), {ok, []} end), @@ -484,7 +500,8 @@ registration_without_acl_token_test(_Config) -> ?assertEqual("v1/agent/service/register", Path), ?assertEqual([], Args), ?assertEqual([], Headers), - Expect = <<"{\"ID\":\"rabbit:10.0.0.1\",\"Name\":\"rabbit\",\"Address\":\"10.0.0.1\",\"Port\":5671,\"Check\":{\"Notes\":\"RabbitMQ Consul-based peer discovery plugin TTL check\",\"TTL\":\"30s\",\"Status\":\"passing\"}}">>, + Node = atom_to_binary(node()), + Expect = <<"{\"ID\":\"rabbit:10.0.0.1\",\"Name\":\"rabbit\",\"Address\":\"10.0.0.1\",\"Port\":5671,\"Check\":{\"Notes\":\"RabbitMQ Consul-based peer discovery plugin TTL check\",\"TTL\":\"30s\",\"Status\":\"passing\"},\"Meta\":{\"erlang-node-name\":\"", Node/binary, "\"}}">>, ?assertEqual(Expect, Body), {ok, []} end), @@ -506,7 +523,8 @@ registration_with_acl_token_test(_Config) -> ?assertEqual("v1/agent/service/register", Path), ?assertEqual([], Args), ?assertEqual([], Headers), - Expect = <<"{\"ID\":\"rabbit:10.0.0.1\",\"Name\":\"rabbit\",\"Address\":\"10.0.0.1\",\"Port\":5671,\"Check\":{\"Notes\":\"RabbitMQ Consul-based peer discovery plugin TTL check\",\"TTL\":\"30s\",\"Status\":\"passing\"}}">>, + Node = atom_to_binary(node()), + Expect = <<"{\"ID\":\"rabbit:10.0.0.1\",\"Name\":\"rabbit\",\"Address\":\"10.0.0.1\",\"Port\":5671,\"Check\":{\"Notes\":\"RabbitMQ Consul-based peer discovery plugin TTL check\",\"TTL\":\"30s\",\"Status\":\"passing\"},\"Meta\":{\"erlang-node-name\":\"", Node/binary, "\"}}">>, ?assertEqual(Expect, Body), {ok, []} end), @@ -531,7 +549,8 @@ registration_with_auto_addr_test(_Config) -> ?assertEqual("v1/agent/service/register", Path), ?assertEqual([], Args), ?assertEqual([{"X-Consul-Token", "token-value"}], Headers), - Expect = <<"{\"ID\":\"rabbitmq:bob\",\"Name\":\"rabbitmq\",\"Address\":\"bob\",\"Port\":5672,\"Check\":{\"Notes\":\"RabbitMQ Consul-based peer discovery plugin TTL check\",\"TTL\":\"30s\",\"Status\":\"passing\"}}">>, + Node = atom_to_binary(node()), + Expect = <<"{\"ID\":\"rabbitmq:bob\",\"Name\":\"rabbitmq\",\"Address\":\"bob\",\"Port\":5672,\"Check\":{\"Notes\":\"RabbitMQ Consul-based peer discovery plugin TTL check\",\"TTL\":\"30s\",\"Status\":\"passing\"},\"Meta\":{\"erlang-node-name\":\"", Node/binary, "\"}}">>, ?assertEqual(Expect, Body), {ok, []} end), @@ -555,7 +574,8 @@ registration_with_auto_addr_from_nodename_test(_Config) -> ?assertEqual("v1/agent/service/register", Path), ?assertEqual([], Args), ?assertEqual([{"X-Consul-Token", "token-value"}], Headers), - Expect = <<"{\"ID\":\"rabbitmq:bob.consul.node\",\"Name\":\"rabbitmq\",\"Address\":\"bob.consul.node\",\"Port\":5672,\"Check\":{\"Notes\":\"RabbitMQ Consul-based peer discovery plugin TTL check\",\"TTL\":\"30s\",\"Status\":\"passing\"}}">>, + Node = atom_to_binary(node()), + Expect = <<"{\"ID\":\"rabbitmq:bob.consul.node\",\"Name\":\"rabbitmq\",\"Address\":\"bob.consul.node\",\"Port\":5672,\"Check\":{\"Notes\":\"RabbitMQ Consul-based peer discovery plugin TTL check\",\"TTL\":\"30s\",\"Status\":\"passing\"},\"Meta\":{\"erlang-node-name\":\"", Node/binary, "\"}}">>, ?assertEqual(Expect, Body), {ok, []} end), @@ -583,7 +603,8 @@ registration_with_auto_addr_nic_test(_Config) -> ?assertEqual("v1/agent/service/register", Path), ?assertEqual([], Args), ?assertEqual([{"X-Consul-Token", "token-value"}], Headers), - Expect = <<"{\"ID\":\"rabbitmq:172.16.4.50\",\"Name\":\"rabbitmq\",\"Address\":\"172.16.4.50\",\"Port\":5672,\"Check\":{\"Notes\":\"RabbitMQ Consul-based peer discovery plugin TTL check\",\"TTL\":\"30s\",\"Status\":\"passing\"}}">>, + Node = atom_to_binary(node()), + Expect = <<"{\"ID\":\"rabbitmq:172.16.4.50\",\"Name\":\"rabbitmq\",\"Address\":\"172.16.4.50\",\"Port\":5672,\"Check\":{\"Notes\":\"RabbitMQ Consul-based peer discovery plugin TTL check\",\"TTL\":\"30s\",\"Status\":\"passing\"},\"Meta\":{\"erlang-node-name\":\"", Node/binary, "\"}}">>, ?assertEqual(Expect, Body), {ok, []} end), @@ -611,7 +632,8 @@ registration_with_auto_addr_nic_issue_12_test(_Config) -> ?assertEqual("v1/agent/service/register", Path), ?assertEqual([], Args), ?assertEqual([{"X-Consul-Token", "token-value"}], Headers), - Expect = <<"{\"ID\":\"rabbitmq:172.16.4.50\",\"Name\":\"rabbitmq\",\"Address\":\"172.16.4.50\",\"Port\":5672,\"Check\":{\"Notes\":\"RabbitMQ Consul-based peer discovery plugin TTL check\",\"TTL\":\"30s\",\"Status\":\"passing\"}}">>, + Node = atom_to_binary(node()), + Expect = <<"{\"ID\":\"rabbitmq:172.16.4.50\",\"Name\":\"rabbitmq\",\"Address\":\"172.16.4.50\",\"Port\":5672,\"Check\":{\"Notes\":\"RabbitMQ Consul-based peer discovery plugin TTL check\",\"TTL\":\"30s\",\"Status\":\"passing\"},\"Meta\":{\"erlang-node-name\":\"", Node/binary, "\"}}">>, ?assertEqual(Expect, Body), {ok, []} end), From 27ed4d2c562ab4cbfc1ca16e42987199a28e682a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Wed, 24 Apr 2024 18:32:36 +0200 Subject: [PATCH 5/9] rabbitmq_peer_discovery_consul: Add clustering testcases [Why] Add a `system_SUITE` testsuite, copied from rabbitmq_peer_discovery_etcd, that attempts to start a RabbitMQ cluster where nodes use a Consul server to discover themselves. [How] The new testcases try to create a cluster using the local Consul node started by the testsuite. The first one starts one RabbitMQ node at a time. the second one starts all of them concurrently. While here, use the Consul source code added as a Git submodule in a previous commit to compile Consul locally just for the testsuite. --- .../BUILD.bazel | 5 + deps/rabbitmq_peer_discovery_consul/app.bzl | 9 + .../test/system_SUITE.erl | 311 ++++++++++++++++++ .../test/system_SUITE_data/.gitignore | 1 + .../test/system_SUITE_data/consul.hcl | 31 ++ 5 files changed, 357 insertions(+) create mode 100644 deps/rabbitmq_peer_discovery_consul/test/system_SUITE.erl create mode 100644 deps/rabbitmq_peer_discovery_consul/test/system_SUITE_data/.gitignore create mode 100644 deps/rabbitmq_peer_discovery_consul/test/system_SUITE_data/consul.hcl diff --git a/deps/rabbitmq_peer_discovery_consul/BUILD.bazel b/deps/rabbitmq_peer_discovery_consul/BUILD.bazel index 5a047f7d67a6..11e70ad3e34f 100644 --- a/deps/rabbitmq_peer_discovery_consul/BUILD.bazel +++ b/deps/rabbitmq_peer_discovery_consul/BUILD.bazel @@ -79,6 +79,11 @@ rabbitmq_integration_suite( name = "config_schema_SUITE", ) +rabbitmq_integration_suite( + name = "system_SUITE", + size = "large", +) + rabbitmq_suite( name = "rabbitmq_peer_discovery_consul_SUITE", size = "medium", diff --git a/deps/rabbitmq_peer_discovery_consul/app.bzl b/deps/rabbitmq_peer_discovery_consul/app.bzl index 7a7de92884a0..6eebe03a85eb 100644 --- a/deps/rabbitmq_peer_discovery_consul/app.bzl +++ b/deps/rabbitmq_peer_discovery_consul/app.bzl @@ -99,6 +99,15 @@ def test_suite_beam_files(name = "test_suite_beam_files"): app_name = "rabbitmq_peer_discovery_consul", erlc_opts = "//:test_erlc_opts", ) + erlang_bytecode( + name = "system_SUITE_beam_files", + testonly = True, + srcs = ["test/system_SUITE.erl"], + outs = ["test/system_SUITE.beam"], + hdrs = [], + app_name = "rabbitmq_peer_discovery_consul", + erlc_opts = "//:test_erlc_opts", + ) erlang_bytecode( name = "rabbitmq_peer_discovery_consul_SUITE_beam_files", testonly = True, diff --git a/deps/rabbitmq_peer_discovery_consul/test/system_SUITE.erl b/deps/rabbitmq_peer_discovery_consul/test/system_SUITE.erl new file mode 100644 index 000000000000..748b168118ff --- /dev/null +++ b/deps/rabbitmq_peer_discovery_consul/test/system_SUITE.erl @@ -0,0 +1,311 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2024 Broadcom. The term “Broadcom” refers to Broadcom Inc. +%% and/or its subsidiaries. All rights reserved. All rights reserved. +%% + +-module(system_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-export([all/0, + groups/0, + init_per_suite/1, + end_per_suite/1, + init_per_group/2, + end_per_group/2, + init_per_testcase/2, + end_per_testcase/2, + + start_one_member_at_a_time/1, + start_members_concurrently/1]). + +-define(CONSUL_GIT_REPO, "https://github.com/hashicorp/consul.git"). +-define(CONSUL_GIT_REF, "v1.18.1"). + +all() -> + [ + {group, clustering} + ]. + +groups() -> + [ + {clustering, [], [start_one_member_at_a_time, + start_members_concurrently]} + ]. + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps( + Config, + [fun clone_consul/1, + fun compile_consul/1, + fun config_consul/1, + fun start_consul/1]). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, [fun stop_consul/1]). + +init_per_group(clustering, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{rmq_nodes_count, 3}, + {rmq_nodes_clustered, false}]); +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, Config) -> + Config. + +init_per_testcase(Testcase, Config) + when Testcase =:= start_one_member_at_a_time orelse + Testcase =:= start_members_concurrently -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase), + ClusterSize = ?config(rmq_nodes_count, Config), + Config1 = rabbit_ct_helpers:set_config( + Config, + [{rmq_nodename_suffix, Testcase}, + {tcp_ports_base, {skip_n_nodes, + TestNumber * ClusterSize}} + ]), + Config2 = rabbit_ct_helpers:merge_app_env( + Config1, {rabbit, [{log, [{file, [{level, debug}]}]}]}), + Config3 = rabbit_ct_helpers:run_steps( + Config2, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()), + try + _ = rabbit_ct_broker_helpers:rpc_all( + Config3, rabbit_peer_discovery_backend, api_version, []), + Config3 + catch + error:{exception, undef, + [{rabbit_peer_discovery_backend, api_version, _, _} | _]} -> + Config4 = rabbit_ct_helpers:run_steps( + Config3, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config4, Testcase), + {skip, + "Some nodes use the old discover->register order; " + "the testcase would likely fail"} + end; +init_per_testcase(_Testcase, Config) -> + Config. + +end_per_testcase(Testcase, Config) + when Testcase =:= start_one_member_at_a_time orelse + Testcase =:= start_members_concurrently -> + Config1 = rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase); +end_per_testcase(_Testcase, Config) -> + Config. + +clone_consul(Config) -> + DataDir = ?config(data_dir, Config), + ConsulSrcdir = filename:join(DataDir, "consul"), + Cmd = case filelib:is_dir(ConsulSrcdir) of + true -> + ct:pal( + "Checking out Consul Git reference, ref = ~s", + [?CONSUL_GIT_REF]), + ["git", "-C", ConsulSrcdir, + "checkout", ?CONSUL_GIT_REF]; + false -> + ct:pal( + "Cloning Consul Git repository, ref = ~s", + [?CONSUL_GIT_REF]), + ["git", "clone", + "--branch", ?CONSUL_GIT_REF, + ?CONSUL_GIT_REPO, ConsulSrcdir] + end, + case rabbit_ct_helpers:exec(Cmd) of + {ok, _} -> + rabbit_ct_helpers:set_config( + Config, {consul_srcdir, ConsulSrcdir}); + {error, _} -> + {skip, "Failed to clone Consul"} + end. + +compile_consul(Config) -> + ConsulSrcdir = ?config(consul_srcdir, Config), + ct:pal("Compiling Consul in ~ts", [ConsulSrcdir]), + Cmd = ["go", "install"], + GOPATH = filename:join(ConsulSrcdir, "go"), + GOFLAGS = "-modcacherw", + Options = [{cd, ConsulSrcdir}, + {env, [{"BINDIR", false}, + {"GOPATH", GOPATH}, + {"GOFLAGS", GOFLAGS}]}], + case rabbit_ct_helpers:exec(Cmd, Options) of + {ok, _} -> + ConsulExe = case os:type() of + {win32, _} -> "consul.exe"; + _ -> "consul" + end, + ConsulBin = filename:join([GOPATH, "bin", ConsulExe]), + ?assert(filelib:is_regular(ConsulBin)), + rabbit_ct_helpers:set_config(Config, {consul_bin, ConsulBin}); + {error, _} -> + {skip, "Failed to compile Consul"} + end. + +config_consul(Config) -> + DataDir = ?config(data_dir, Config), + PrivDir = ?config(priv_dir, Config), + ConsulConfDir = filename:join(PrivDir, "conf.consul"), + ConsulDataDir = filename:join(PrivDir, "data.consul"), + ConsulHost = "localhost", + ConsulTcpPort = 8500, + + ConsulConfTpl = filename:join(DataDir, "consul.hcl"), + {ok, ConsulConf0} = file:read_file(ConsulConfTpl), + ConsulConf1 = io_lib:format( + "~ts~n" + "node_name = \"~ts\"~n" + "domain = \"~ts\"~n" + "data_dir = \"~ts\"~n" + "ports {~n" + " http = ~b~n" + " grpc = -1~n" + "}~n", + [ConsulConf0, ConsulHost, ConsulHost, ConsulDataDir, + ConsulTcpPort]), + ConsulConfFile = filename:join(ConsulConfDir, "consul.hcl"), + ok = file:make_dir(ConsulConfDir), + ok = file:write_file(ConsulConfFile, ConsulConf1), + rabbit_ct_helpers:set_config( + Config, + [{consul_conf_dir, ConsulConfDir}, + {consul_host, ConsulHost}, + {consul_tcp_port, ConsulTcpPort}]). + +start_consul(Config) -> + ct:pal("Starting Consul daemon"), + ConsulBin = ?config(consul_bin, Config), + ConsulConfDir = ?config(consul_conf_dir, Config), + Cmd = [ConsulBin, "agent", "-config-dir", ConsulConfDir], + ConsulPid = spawn(fun() -> rabbit_ct_helpers:exec(Cmd) end), + rabbit_ct_helpers:set_config(Config, {consul_pid, ConsulPid}). + +stop_consul(Config) -> + case rabbit_ct_helpers:get_config(Config, consul_pid) of + ConsulPid when is_pid(ConsulPid) -> + ct:pal( + "Stopping Consul daemon by killing control process ~p", + [ConsulPid]), + erlang:exit(ConsulPid, kill), + _ = case os:type() of + {win32, _} -> ok; + _ -> rabbit_ct_helpers:exec(["pkill", "consul"]) + end; + undefined -> + ok + end, + Config. + +%% +%% Test cases +%% + +start_one_member_at_a_time(Config) -> + Config1 = configure_peer_discovery(Config), + + Nodes = rabbit_ct_broker_helpers:get_node_configs(Config1, nodename), + lists:foreach( + fun(Node) -> + ?assertEqual( + ok, + rabbit_ct_broker_helpers:start_node(Config1, Node)) + end, Nodes), + + assert_full_cluster(Config1). + +start_members_concurrently(Config) -> + Config1 = configure_peer_discovery(Config), + + Nodes = rabbit_ct_broker_helpers:get_node_configs(Config1, nodename), + Parent = self(), + Pids = lists:map( + fun(Node) -> + spawn_link( + fun() -> + receive + go -> + ?assertEqual( + ok, + rabbit_ct_broker_helpers:start_node( + Config1, Node)), + Parent ! started + end + end) + end, Nodes), + + lists:foreach(fun(Pid) -> Pid ! go end, Pids), + lists:foreach(fun(_Pid) -> receive started -> ok end end, Pids), + + assert_full_cluster(Config1). + +configure_peer_discovery(Config) -> + Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + lists:foreach( + fun(Node) -> + Members = lists:sort( + rabbit_ct_broker_helpers:cluster_members_online( + Config, Node)), + ?assertEqual([Node], Members) + end, Nodes), + + lists:foreach( + fun(Node) -> + ?assertEqual( + ok, + rabbit_ct_broker_helpers:stop_broker(Config, Node)), + ?assertEqual( + ok, + rabbit_ct_broker_helpers:reset_node(Config, Node)), + ?assertEqual( + ok, + rabbit_ct_broker_helpers:stop_node(Config, Node)) + end, Nodes), + + ConsulHost = ?config(consul_host, Config), + ConsulTcpPort = ?config(consul_tcp_port, Config), + lists:foreach( + fun(Node) -> + Config1 = rabbit_ct_helpers:merge_app_env( + Config, + {rabbit, + [{cluster_formation, + [{peer_discovery_backend, + rabbit_peer_discovery_consul}, + {peer_discovery_consul, + [{consul_svc_id, atom_to_list(Node)}, + {consul_host, ConsulHost}, + {consul_port, ConsulTcpPort}, + {consul_scheme, "http"}]}]}]}), + ?assertEqual( + ok, + rabbit_ct_broker_helpers:rewrite_node_config_file( + Config1, Node)) + end, Nodes), + + Config. + +assert_full_cluster(Config) -> + Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + ExpectedMembers = lists:sort(Nodes), + lists:foreach( + fun(Node) -> + Members = lists:sort( + rabbit_ct_broker_helpers:cluster_members_online( + Config, Node)), + ?assertEqual(ExpectedMembers, Members) + end, Nodes). diff --git a/deps/rabbitmq_peer_discovery_consul/test/system_SUITE_data/.gitignore b/deps/rabbitmq_peer_discovery_consul/test/system_SUITE_data/.gitignore new file mode 100644 index 000000000000..75ab1a997deb --- /dev/null +++ b/deps/rabbitmq_peer_discovery_consul/test/system_SUITE_data/.gitignore @@ -0,0 +1 @@ +/consul/ diff --git a/deps/rabbitmq_peer_discovery_consul/test/system_SUITE_data/consul.hcl b/deps/rabbitmq_peer_discovery_consul/test/system_SUITE_data/consul.hcl new file mode 100644 index 000000000000..377132117ee6 --- /dev/null +++ b/deps/rabbitmq_peer_discovery_consul/test/system_SUITE_data/consul.hcl @@ -0,0 +1,31 @@ +log_level = "DEBUG" +enable_syslog = false +enable_script_checks = false +enable_local_script_checks = true + +datacenter = "dc1" +server = true +bootstrap_expect = 1 + +## ACL configuration +acl = { + enabled = true + default_policy = "allow" + enable_token_persistence = true + enable_token_replication = true + down_policy = "extend-cache" +} + +# Enable service mesh +connect { + enabled = true +} + +# Addresses and ports +client_addr = "0.0.0.0" +bind_addr = "0.0.0.0" + +addresses { + grpc = "0.0.0.0" + http = "0.0.0.0" +} From a56d82cb72b207b7f97f587bc45889d2310aa308 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Fri, 26 Apr 2024 18:05:34 +0200 Subject: [PATCH 6/9] rabbitmq_peer_discovery_consul: Handle locking inside `list_nodes/0` [Why] The new implementation of `rabbit_peer_discovery` acquires the lock only when a node needs to join another one. This is meant to disappear in the medium/long term anyway. Here, we need to lock the query to Consul to make sure that queries happen sequentially, not concurrently. This is a work in progress and we may not keep it either. --- .../src/rabbit_peer_discovery_consul.erl | 62 ++++++++++++------- .../src/rabbitmq_peer_discovery_consul.erl | 2 +- .../rabbitmq_peer_discovery_consul_SUITE.erl | 54 ++++++++++++++++ 3 files changed, 96 insertions(+), 22 deletions(-) diff --git a/deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl b/deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl index db3699e304e1..b1bfeafb9d88 100644 --- a/deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl +++ b/deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl @@ -68,21 +68,30 @@ list_nodes() -> {ok, {[], disc}} end, Fun2 = fun(Proplist) -> - M = maps:from_list(Proplist), - Path = rabbit_peer_discovery_httpc:build_path([v1, health, service, service_name()]), - HttpOpts = http_options(M), - case rabbit_peer_discovery_httpc:get(get_config_key(consul_scheme, M), - get_config_key(consul_host, M), - get_integer_config_key(consul_port, M), - Path, - list_nodes_query_args(), - maybe_add_acl([]), - HttpOpts) of - {ok, Nodes} -> - IncludeWithWarnings = get_config_key(consul_include_nodes_with_warnings, M), - Result = extract_nodes( - filter_nodes(Nodes, IncludeWithWarnings)), - {ok, {Result, disc}}; + case internal_lock() of + {ok, Priv} -> + try + M = maps:from_list(Proplist), + Path = rabbit_peer_discovery_httpc:build_path([v1, health, service, service_name()]), + HttpOpts = http_options(M), + case rabbit_peer_discovery_httpc:get(get_config_key(consul_scheme, M), + get_config_key(consul_host, M), + get_integer_config_key(consul_port, M), + Path, + list_nodes_query_args(), + maybe_add_acl([]), + HttpOpts) of + {ok, Nodes} -> + IncludeWithWarnings = get_config_key(consul_include_nodes_with_warnings, M), + Result = extract_nodes( + filter_nodes(Nodes, IncludeWithWarnings)), + {ok, {Result, disc}}; + {error, _} = Error -> + Error + end + after + internal_unlock(Priv) + end; {error, _} = Error -> Error end @@ -164,9 +173,20 @@ post_registration() -> ok. -spec lock(Nodes :: [node()]) -> - {ok, Data :: term()} | {error, Reason :: string()}. + not_supported. lock(_Nodes) -> + not_supported. + +-spec unlock(Data :: term()) -> ok. + +unlock(_Data) -> + ok. + +-spec internal_lock() -> + {ok, Data :: term()} | {error, Reason :: string()}. + +internal_lock() -> M = ?CONFIG_MODULE:config_map(?BACKEND_CONFIG_KEY), ?LOG_DEBUG( "Effective Consul peer discovery configuration: ~tp", [M], @@ -179,13 +199,13 @@ lock(_Nodes) -> EndTime = Now + get_config_key(lock_wait_time, M), lock(TRef, SessionId, Now, EndTime); {error, Reason} -> - {error, lists:flatten(io_lib:format("Error while creating a session, reason: ~ts", + {error, lists:flatten(io_lib:format("Error while creating a session, reason: ~0p", [Reason]))} end. --spec unlock({SessionId :: string(), TRef :: timer:tref()}) -> ok. +-spec internal_unlock({SessionId :: string(), TRef :: timer:tref()}) -> ok. -unlock({SessionId, TRef}) -> +internal_unlock({SessionId, TRef}) -> _ = timer:cancel(TRef), ?LOG_DEBUG( "Stopped session renewal", @@ -620,7 +640,7 @@ wait_for_list_nodes(N) -> %% Create a session to be acquired for a common key %% @end %%-------------------------------------------------------------------- --spec create_session(atom(), pos_integer()) -> {ok, string()} | {error, Reason::string()}. +-spec create_session(atom(), pos_integer()) -> {ok, string()} | {error, Reason::any()}. create_session(Name, TTL) -> case consul_session_create([], maybe_add_acl([]), [{'Name', Name}, @@ -705,7 +725,7 @@ start_session_ttl_updater(SessionId) -> %% Tries to acquire lock. If the lock is held by someone else, waits until it %% is released, or too much time has passed %% @end --spec lock(timer:tref(), string(), pos_integer(), pos_integer()) -> {ok, string()} | {error, string()}. +-spec lock(timer:tref(), string(), pos_integer(), pos_integer()) -> {ok, {SessionId :: string(), TRef :: timer:tref()}} | {error, string()}. lock(TRef, _, Now, EndTime) when EndTime < Now -> _ = timer:cancel(TRef), {error, "Acquiring lock taking too long, bailing out"}; diff --git a/deps/rabbitmq_peer_discovery_consul/src/rabbitmq_peer_discovery_consul.erl b/deps/rabbitmq_peer_discovery_consul/src/rabbitmq_peer_discovery_consul.erl index 649af8f25f9f..ab889b8edb44 100644 --- a/deps/rabbitmq_peer_discovery_consul/src/rabbitmq_peer_discovery_consul.erl +++ b/deps/rabbitmq_peer_discovery_consul/src/rabbitmq_peer_discovery_consul.erl @@ -42,7 +42,7 @@ unregister() -> post_registration() -> ?DELEGATE:post_registration(). --spec lock(Nodes :: [node()]) -> {ok, Data :: term()} | {error, Reason :: string()}. +-spec lock(Nodes :: [node()]) -> not_supported. lock(Node) -> ?DELEGATE:lock(Node). diff --git a/deps/rabbitmq_peer_discovery_consul/test/rabbitmq_peer_discovery_consul_SUITE.erl b/deps/rabbitmq_peer_discovery_consul/test/rabbitmq_peer_discovery_consul_SUITE.erl index 6b53798e96e4..36fb01a7ca68 100644 --- a/deps/rabbitmq_peer_discovery_consul/test/rabbitmq_peer_discovery_consul_SUITE.erl +++ b/deps/rabbitmq_peer_discovery_consul/test/rabbitmq_peer_discovery_consul_SUITE.erl @@ -345,6 +345,15 @@ list_nodes_return_value_basic_test(_Config) -> {consul_port, 8500} ]} ]), + meck:expect(rabbit_peer_discovery_httpc, put, + fun + (_, _, _, "v1/session/create", _, _, _, _) -> + Body = "{\"ID\":\"some-session-id\"}", + rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body)); + (_, _, _, "v1/kv/rabbitmq/default/startup_lock", _, _, _, _) -> + Body = "true", + rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body)) + end), meck:expect(rabbit_peer_discovery_httpc, get, fun(_, _, _, _, _, _, _) -> Body = "[{\"Node\": {\"Node\": \"rabbit2.internal.domain\", \"Address\": \"10.20.16.160\"}, \"Checks\": [{\"Node\": \"rabbit2.internal.domain\", \"CheckID\": \"service:rabbitmq\", \"Name\": \"Service \'rabbitmq\' check\", \"ServiceName\": \"rabbitmq\", \"Notes\": \"Connect to the port internally every 30 seconds\", \"Status\": \"passing\", \"ServiceID\": \"rabbitmq\", \"Output\": \"\"}, {\"Node\": \"rabbit2.internal.domain\", \"CheckID\": \"serfHealth\", \"Name\": \"Serf Health Status\", \"ServiceName\": \"\", \"Notes\": \"\", \"Status\": \"passing\", \"ServiceID\": \"\", \"Output\": \"Agent alive and reachable\"}], \"Service\": {\"Address\": \"\", \"Port\": 5672, \"ID\": \"rabbitmq\", \"Service\": \"rabbitmq\", \"Tags\": [\"amqp\"]}}, {\"Node\": {\"Node\": \"rabbit1.internal.domain\", \"Address\": \"10.20.16.159\"}, \"Checks\": [{\"Node\": \"rabbit1.internal.domain\", \"CheckID\": \"service:rabbitmq\", \"Name\": \"Service \'rabbitmq\' check\", \"ServiceName\": \"rabbitmq\", \"Notes\": \"Connect to the port internally every 30 seconds\", \"Status\": \"passing\", \"ServiceID\": \"rabbitmq\", \"Output\": \"\"}, {\"Node\": \"rabbit1.internal.domain\", \"CheckID\": \"serfHealth\", \"Name\": \"Serf Health Status\", \"ServiceName\": \"\", \"Notes\": \"\", \"Status\": \"passing\", \"ServiceID\": \"\", \"Output\": \"Agent alive and reachable\"}], \"Service\": {\"Address\": \"\", \"Port\": 5672, \"ID\": \"rabbitmq\", \"Service\": \"rabbitmq\", \"Tags\": [\"amqp\"]}}]", @@ -364,6 +373,15 @@ list_nodes_return_value_basic_long_node_name_test(_Config) -> {consul_port, 8500} ]} ]), + meck:expect(rabbit_peer_discovery_httpc, put, + fun + (_, _, _, "v1/session/create", _, _, _, _) -> + Body = "{\"ID\":\"some-session-id\"}", + rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body)); + (_, _, _, "v1/kv/rabbitmq/default/startup_lock", _, _, _, _) -> + Body = "true", + rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body)) + end), meck:expect(rabbit_peer_discovery_httpc, get, fun(_, _, _, _, _, _, _) -> Body = "[{\"Node\": {\"Node\": \"rabbit2\", \"Address\": \"10.20.16.160\"}, \"Checks\": [{\"Node\": \"rabbit2\", \"CheckID\": \"service:rabbitmq\", \"Name\": \"Service \'rabbitmq\' check\", \"ServiceName\": \"rabbitmq\", \"Notes\": \"Connect to the port internally every 30 seconds\", \"Status\": \"passing\", \"ServiceID\": \"rabbitmq\", \"Output\": \"\"}, {\"Node\": \"rabbit2\", \"CheckID\": \"serfHealth\", \"Name\": \"Serf Health Status\", \"ServiceName\": \"\", \"Notes\": \"\", \"Status\": \"passing\", \"ServiceID\": \"\", \"Output\": \"Agent alive and reachable\"}], \"Service\": {\"Address\": \"\", \"Port\": 5672, \"ID\": \"rabbitmq\", \"Service\": \"rabbitmq\", \"Tags\": [\"amqp\"]}}, {\"Node\": {\"Node\": \"rabbit1\", \"Address\": \"10.20.16.159\"}, \"Checks\": [{\"Node\": \"rabbit1\", \"CheckID\": \"service:rabbitmq\", \"Name\": \"Service \'rabbitmq\' check\", \"ServiceName\": \"rabbitmq\", \"Notes\": \"Connect to the port internally every 30 seconds\", \"Status\": \"passing\", \"ServiceID\": \"rabbitmq\", \"Output\": \"\"}, {\"Node\": \"rabbit1\", \"CheckID\": \"serfHealth\", \"Name\": \"Serf Health Status\", \"ServiceName\": \"\", \"Notes\": \"\", \"Status\": \"passing\", \"ServiceID\": \"\", \"Output\": \"Agent alive and reachable\"}], \"Service\": {\"Address\": \"\", \"Port\": 5672, \"ID\": \"rabbitmq\", \"Service\": \"rabbitmq\", \"Tags\": [\"amqp\"]}}]", @@ -384,6 +402,15 @@ list_nodes_return_value_long_node_name_and_custom_domain_test(_Config) -> {consul_domain, "internal"} ]} ]), + meck:expect(rabbit_peer_discovery_httpc, put, + fun + (_, _, _, "v1/session/create", _, _, _, _) -> + Body = "{\"ID\":\"some-session-id\"}", + rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body)); + (_, _, _, "v1/kv/rabbitmq/default/startup_lock", _, _, _, _) -> + Body = "true", + rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body)) + end), meck:expect(rabbit_peer_discovery_httpc, get, fun(_, _, _, _, _, _, _) -> Body = "[{\"Node\": {\"Node\": \"rabbit2\", \"Address\": \"10.20.16.160\"}, \"Checks\": [{\"Node\": \"rabbit2\", \"CheckID\": \"service:rabbitmq\", \"Name\": \"Service \'rabbitmq\' check\", \"ServiceName\": \"rabbitmq\", \"Notes\": \"Connect to the port internally every 30 seconds\", \"Status\": \"passing\", \"ServiceID\": \"rabbitmq\", \"Output\": \"\"}, {\"Node\": \"rabbit2\", \"CheckID\": \"serfHealth\", \"Name\": \"Serf Health Status\", \"ServiceName\": \"\", \"Notes\": \"\", \"Status\": \"passing\", \"ServiceID\": \"\", \"Output\": \"Agent alive and reachable\"}], \"Service\": {\"Address\": \"\", \"Port\": 5672, \"ID\": \"rabbitmq\", \"Service\": \"rabbitmq\", \"Tags\": [\"amqp\"]}}, {\"Node\": {\"Node\": \"rabbit1\", \"Address\": \"10.20.16.159\"}, \"Checks\": [{\"Node\": \"rabbit1\", \"CheckID\": \"service:rabbitmq\", \"Name\": \"Service \'rabbitmq\' check\", \"ServiceName\": \"rabbitmq\", \"Notes\": \"Connect to the port internally every 30 seconds\", \"Status\": \"passing\", \"ServiceID\": \"rabbitmq\", \"Output\": \"\"}, {\"Node\": \"rabbit1\", \"CheckID\": \"serfHealth\", \"Name\": \"Serf Health Status\", \"ServiceName\": \"\", \"Notes\": \"\", \"Status\": \"passing\", \"ServiceID\": \"\", \"Output\": \"Agent alive and reachable\"}], \"Service\": {\"Address\": \"\", \"Port\": 5672, \"ID\": \"rabbitmq\", \"Service\": \"rabbitmq\", \"Tags\": [\"amqp\"]}}]", @@ -405,6 +432,15 @@ list_nodes_return_value_srv_address_test(_Config) -> {consul_port, 8500} ]} ]), + meck:expect(rabbit_peer_discovery_httpc, put, + fun + (_, _, _, "v1/session/create", _, _, _, _) -> + Body = "{\"ID\":\"some-session-id\"}", + rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body)); + (_, _, _, "v1/kv/rabbitmq/default/startup_lock", _, _, _, _) -> + Body = "true", + rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body)) + end), meck:expect(rabbit_peer_discovery_httpc, get, fun(_, _, _, _, _, _, _) -> Body = "[{\"Node\": {\"Node\": \"rabbit2.internal.domain\", \"Address\": \"10.20.16.160\"}, \"Checks\": [{\"Node\": \"rabbit2.internal.domain\", \"CheckID\": \"service:rabbitmq\", \"Name\": \"Service \'rabbitmq\' check\", \"ServiceName\": \"rabbitmq\", \"Notes\": \"Connect to the port internally every 30 seconds\", \"Status\": \"passing\", \"ServiceID\": \"rabbitmq:172.172.16.4.50\", \"Output\": \"\"}, {\"Node\": \"rabbit2.internal.domain\", \"CheckID\": \"serfHealth\", \"Name\": \"Serf Health Status\", \"ServiceName\": \"\", \"Notes\": \"\", \"Status\": \"passing\", \"ServiceID\": \"\", \"Output\": \"Agent alive and reachable\"}], \"Service\": {\"Address\": \"172.16.4.51\", \"Port\": 5672, \"ID\": \"rabbitmq:172.16.4.51\", \"Service\": \"rabbitmq\", \"Tags\": [\"amqp\"]}}, {\"Node\": {\"Node\": \"rabbit1.internal.domain\", \"Address\": \"10.20.16.159\"}, \"Checks\": [{\"Node\": \"rabbit1.internal.domain\", \"CheckID\": \"service:rabbitmq\", \"Name\": \"Service \'rabbitmq\' check\", \"ServiceName\": \"rabbitmq\", \"Notes\": \"Connect to the port internally every 30 seconds\", \"Status\": \"passing\", \"ServiceID\": \"rabbitmq\", \"Output\": \"\"}, {\"Node\": \"rabbit1.internal.domain\", \"CheckID\": \"serfHealth\", \"Name\": \"Serf Health Status\", \"ServiceName\": \"\", \"Notes\": \"\", \"Status\": \"passing\", \"ServiceID\": \"\", \"Output\": \"Agent alive and reachable\"}], \"Service\": {\"Address\": \"172.172.16.51\", \"Port\": 5672, \"ID\": \"rabbitmq:172.172.16.51\", \"Service\": \"rabbitmq\", \"Tags\": [\"amqp\"]}}]", @@ -423,6 +459,15 @@ list_nodes_return_value_nodes_in_warning_state_included_test(_Config) -> {consul_port, 8500} ]} ]), + meck:expect(rabbit_peer_discovery_httpc, put, + fun + (_, _, _, "v1/session/create", _, _, _, _) -> + Body = "{\"ID\":\"some-session-id\"}", + rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body)); + (_, _, _, "v1/kv/rabbitmq/default/startup_lock", _, _, _, _) -> + Body = "true", + rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body)) + end), meck:expect(rabbit_peer_discovery_httpc, get, fun(_, _, _, _, [], _, _) -> rabbit_json:try_decode(list_of_nodes_with_warnings()); @@ -443,6 +488,15 @@ list_nodes_return_value_nodes_in_warning_state_filtered_out_test(_Config) -> {consul_port, 8500} ]} ]), + meck:expect(rabbit_peer_discovery_httpc, put, + fun + (_, _, _, "v1/session/create", _, _, _, _) -> + Body = "{\"ID\":\"some-session-id\"}", + rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body)); + (_, _, _, "v1/kv/rabbitmq/default/startup_lock", _, _, _, _) -> + Body = "true", + rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body)) + end), meck:expect(rabbit_peer_discovery_httpc, get, fun(_, _, _, _, [], _, _) -> rabbit_json:try_decode(list_of_nodes_with_warnings()); From cb9f0d8a44187fdd1274cb635e33b6b4d2d0235a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Fri, 19 Apr 2024 15:26:15 +0200 Subject: [PATCH 7/9] rabbit_peer_discovery: Register node before running discovery [Why] The two backends that use registration are Consul and etcd. The discovery process relies on the registered nodes: they return whatever was previously registered. With the new checks and failsafes added in peer discovery in RabbitMQ 3.13.0, the fact that registration happens after running discovery breaks Consul and etcd backend. It used to work before because the first node would eventually time out waiting for a non-empty list of nodes from the backend and proceed as a standalone node, registering itself on the way. Following nodes would then discover that first node. Among the new checks, the node running discovery expects to find itself in the list of discovered nodes. Because it didn't register yet, it will never find itself. [How] The solution is to register first, then run discovery. The node should at least get itself in the discovered nodes. --- deps/rabbit/src/rabbit_db.erl | 2 +- deps/rabbit_common/src/rabbit_peer_discovery_backend.erl | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/deps/rabbit/src/rabbit_db.erl b/deps/rabbit/src/rabbit_db.erl index adccf67828b9..0a552adeb1d8 100644 --- a/deps/rabbit/src/rabbit_db.erl +++ b/deps/rabbit/src/rabbit_db.erl @@ -52,6 +52,7 @@ init() -> ensure_dir_exists(), rabbit_peer_discovery:maybe_init(), + rabbit_peer_discovery:maybe_register(), pre_init(IsVirgin), @@ -77,7 +78,6 @@ init() -> "DB: initialization successeful", #{domain => ?RMQLOG_DOMAIN_DB}), - rabbit_peer_discovery:maybe_register(), init_finished(), ok; diff --git a/deps/rabbit_common/src/rabbit_peer_discovery_backend.erl b/deps/rabbit_common/src/rabbit_peer_discovery_backend.erl index d7509b78542f..19c3ccf15cc2 100644 --- a/deps/rabbit_common/src/rabbit_peer_discovery_backend.erl +++ b/deps/rabbit_common/src/rabbit_peer_discovery_backend.erl @@ -57,3 +57,8 @@ -callback unlock(Data :: term()) -> ok. -optional_callbacks([init/0]). + +-export([api_version/0]). + +api_version() -> + 2. From 3147ab7d47537bcb5114e8555d783629644afc0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Thu, 2 May 2024 14:52:01 +0200 Subject: [PATCH 8/9] rabbit_peer_discovery: Allow backends to select the node to join themselves [Why] Before, the backend would always return a list of nodes and the subsystem would select one based on their uptimes, the nodes they are already clustered with, and the readiness of their database. This works well in general but has some limitations. For instance with the Consul backend, the discoverability of nodes depends on when each one registered and in which order. Therefore, the node with the highest uptime might not be the first that registers. In this case, the one that registers first will only discover itself and boot as a standalone node. However, the one with the highest uptime that registered after will discover both nodes. It will then select itself as the node to join because it has the highest uptime. In the end both nodes form distinct clusters. Another example is the Kubernetes backend. The current solution works fine but it could be optimized: the backend knows we always want to join the first node ("$node-0") regardless of the order in which they are started because picking the first node alphabetically is fine. Therefore we want to let the backend selects the node to join if it wants. [How] The `list_nodes()` callback can now return the following term: {ok, {SelectedNode :: node(), NodeType}} If the subsystem sees this return value, it will consider that the returned node is the one to join. It will still query properties because we want to make sure the node's database is ready before joining it. --- deps/rabbit/src/rabbit_peer_discovery.erl | 43 +++++++++++++++---- .../src/rabbit_peer_discovery_backend.erl | 2 +- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/deps/rabbit/src/rabbit_peer_discovery.erl b/deps/rabbit/src/rabbit_peer_discovery.erl index 7d49d087dc9c..df6731bd2abf 100644 --- a/deps/rabbit/src/rabbit_peer_discovery.erl +++ b/deps/rabbit/src/rabbit_peer_discovery.erl @@ -186,8 +186,22 @@ sync_desired_cluster(Backend, RetriesLeft, RetryDelay) -> #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), ok; {ok, {DiscoveredNodes, NodeType}} -> - NodesAndProps = query_node_props(DiscoveredNodes), - case can_use_discovered_nodes(DiscoveredNodes, NodesAndProps) of + NodeAlreadySelected = is_atom(DiscoveredNodes), + NodesAndProps = case NodeAlreadySelected of + true -> + ?LOG_DEBUG( + "Peer discovery: node '~ts' already " + "selected by backend", + [DiscoveredNodes], + #{domain => ?RMQLOG_DOMAIN_PEER_DISC}), + query_node_props([DiscoveredNodes]); + false -> + query_node_props(DiscoveredNodes) + end, + CanUse = ( + NodeAlreadySelected orelse + can_use_discovered_nodes(DiscoveredNodes, NodesAndProps)), + case CanUse of true -> case select_node_to_join(NodesAndProps) of SelectedNode when SelectedNode =/= false -> @@ -249,8 +263,9 @@ retry_sync_desired_cluster(_Backend, 0, _RetryDelay) -> ok. -spec discover_cluster_nodes() -> {ok, Discovery} when - Discovery :: {DiscoveredNodes, NodeType}, + Discovery :: {DiscoveredNodes | SelectedNode, NodeType}, DiscoveredNodes :: [node()], + SelectedNode :: node(), NodeType :: rabbit_types:node_type(). %% @doc Queries the peer discovery backend to discover nodes. %% @@ -262,10 +277,11 @@ discover_cluster_nodes() -> -spec discover_cluster_nodes(Backend) -> Ret when Backend :: backend(), - Discovery :: {DiscoveredNodes, NodeType}, + Ret :: {ok, Discovery} | {error, Reason}, + Discovery :: {DiscoveredNodes | SelectedNode, NodeType}, DiscoveredNodes :: [node()], + SelectedNode :: node(), NodeType :: rabbit_types:node_type(), - Ret :: {ok, Discovery} | {error, Reason}, Reason :: any(). %% @private @@ -295,7 +311,7 @@ discover_cluster_nodes(Backend) -> -spec check_discovered_nodes_list_validity(DiscoveredNodes, NodeType) -> Ret when - DiscoveredNodes :: [node()], + DiscoveredNodes :: [node()] | node(), NodeType :: rabbit_types:node_type(), Ret :: ok. %% @private @@ -310,6 +326,12 @@ check_discovered_nodes_list_validity(DiscoveredNodes, NodeType) [] -> ok; _ -> e({invalid_cluster_node_names, BadNodenames}) end; +check_discovered_nodes_list_validity(SelectedNode, NodeType) + when NodeType =:= disc orelse NodeType =:= disk orelse NodeType =:= ram -> + case is_atom(SelectedNode) of + true -> ok; + false -> e({invalid_cluster_node_names, SelectedNode}) + end; check_discovered_nodes_list_validity(DiscoveredNodes, BadNodeType) when is_list(DiscoveredNodes) -> e({invalid_cluster_node_type, BadNodeType}). @@ -836,7 +858,7 @@ can_use_discovered_nodes(_DiscoveredNodes, []) -> false. -spec select_node_to_join(NodesAndProps) -> SelectedNode when - NodesAndProps :: [node_and_props()], + NodesAndProps :: nonempty_list(node_and_props()), SelectedNode :: node() | false. %% @doc Selects the node to join among the sorted list of nodes. %% @@ -1140,10 +1162,10 @@ unlock(Backend, Data) -> {Nodes :: [node()], NodeType :: rabbit_types:node_type()} | {ok, Nodes :: [node()]} | - {ok, {Nodes :: [node()], + {ok, {Nodes :: [node()] | node(), NodeType :: rabbit_types:node_type()}} | {error, Reason :: string()}) -> - {ok, {Nodes :: [node()], NodeType :: rabbit_types:node_type()}} | + {ok, {Nodes :: [node()] | node(), NodeType :: rabbit_types:node_type()}} | {error, Reason :: string()}. normalize(Nodes) when is_list(Nodes) -> @@ -1154,6 +1176,9 @@ normalize({ok, Nodes}) when is_list(Nodes) -> {ok, {Nodes, disc}}; normalize({ok, {Nodes, NodeType}}) when is_list(Nodes) andalso is_atom(NodeType) -> {ok, {Nodes, NodeType}}; +normalize({ok, {Node, NodeType}}) + when is_atom(Node) andalso is_atom(NodeType) -> + {ok, {Node, NodeType}}; normalize({error, Reason}) -> {error, Reason}. diff --git a/deps/rabbit_common/src/rabbit_peer_discovery_backend.erl b/deps/rabbit_common/src/rabbit_peer_discovery_backend.erl index 19c3ccf15cc2..6f6b2d30b115 100644 --- a/deps/rabbit_common/src/rabbit_peer_discovery_backend.erl +++ b/deps/rabbit_common/src/rabbit_peer_discovery_backend.erl @@ -41,7 +41,7 @@ -callback init() -> ok | {error, Reason :: string()}. --callback list_nodes() -> {ok, {Nodes :: list(), NodeType :: rabbit_types:node_type()}} | +-callback list_nodes() -> {ok, {Nodes :: [node()] | node(), NodeType :: rabbit_types:node_type()}} | {error, Reason :: string()}. -callback supports_registration() -> boolean(). From 0f054e118656d127a0b4eb8dde6568ea07195530 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Thu, 2 May 2024 14:58:40 +0200 Subject: [PATCH 9/9] rabbitmq_peer_discovery_consul: Select the node to join [Why] The default node selection of the peer discovery subsystem doesn't work well with Consul. The reason is that that selection is based on the nodes' uptime. However, the node with the highest uptime may not be the first to register in Consul. When this happens, the node that registered first will only discover itself and boot as a standalone node. Then, the node with the highest uptime will discover both of them, but will select itself as the node to join because of its uptime. In the end, we end up with two clusters instead of one. [How] We use the `CreateIndex` property in the Consul response to sort services. We then derive the name of the node to join after the service that has the lower `CreateIndex`, meaning it was the first to register. --- .../src/rabbit_peer_discovery_consul.erl | 32 +++++++++++++------ .../rabbitmq_peer_discovery_consul_SUITE.erl | 12 +++---- 2 files changed, 28 insertions(+), 16 deletions(-) diff --git a/deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl b/deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl index b1bfeafb9d88..d18176ef4f59 100644 --- a/deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl +++ b/deps/rabbitmq_peer_discovery_consul/src/rabbit_peer_discovery_consul.erl @@ -83,8 +83,9 @@ list_nodes() -> HttpOpts) of {ok, Nodes} -> IncludeWithWarnings = get_config_key(consul_include_nodes_with_warnings, M), - Result = extract_nodes( - filter_nodes(Nodes, IncludeWithWarnings)), + Result = extract_node( + sort_nodes( + filter_nodes(Nodes, IncludeWithWarnings))), {ok, {Result, disc}}; {error, _} = Error -> Error @@ -276,13 +277,24 @@ filter_nodes(Nodes, Warn) -> false -> Nodes end. --spec extract_nodes(ConsulResult :: [#{binary() => term()}]) -> list(). -extract_nodes(Data) -> extract_nodes(Data, []). - --spec extract_nodes(ConsulResult :: [#{binary() => term()}], Nodes :: list()) - -> list(). -extract_nodes([], Nodes) -> Nodes; -extract_nodes([H | T], Nodes) -> +-spec sort_nodes(ConsulResult :: [#{binary() => term()}]) -> [#{binary() => term()}]. +sort_nodes(Nodes) -> + lists:sort( + fun(NodeA, NodeB) -> + IndexA = maps:get( + <<"CreateIndex">>, + maps:get(<<"Service">>, NodeA, #{}), undefined), + IndexB = maps:get( + <<"CreateIndex">>, + maps:get(<<"Service">>, NodeB, #{}), undefined), + %% `undefined' is always greater than an integer, so we are fine here. + IndexA =< IndexB + end, Nodes). + +-spec extract_node(ConsulResult :: [#{binary() => term()}]) -> list(). +extract_node([]) -> + []; +extract_node([H | _]) -> Service = maps:get(<<"Service">>, H), Meta = maps:get(<<"Meta">>, Service, #{}), NodeName = case Meta of @@ -299,7 +311,7 @@ extract_nodes([H | T], Nodes) -> ?UTIL_MODULE:node_name(Address) end end, - extract_nodes(T, lists:merge(Nodes, [NodeName])). + NodeName. -spec maybe_add_acl(QArgs :: list()) -> list(). maybe_add_acl(List) -> diff --git a/deps/rabbitmq_peer_discovery_consul/test/rabbitmq_peer_discovery_consul_SUITE.erl b/deps/rabbitmq_peer_discovery_consul/test/rabbitmq_peer_discovery_consul_SUITE.erl index 36fb01a7ca68..23a069dae9dc 100644 --- a/deps/rabbitmq_peer_discovery_consul/test/rabbitmq_peer_discovery_consul_SUITE.erl +++ b/deps/rabbitmq_peer_discovery_consul/test/rabbitmq_peer_discovery_consul_SUITE.erl @@ -360,7 +360,7 @@ list_nodes_return_value_basic_test(_Config) -> rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body)) end), meck:expect(rabbit_nodes, name_type, fun() -> shortnames end), - ?assertEqual({ok, {['rabbit@rabbit1', 'rabbit@rabbit2'], disc}}, + ?assertEqual({ok, {'rabbit@rabbit2', disc}}, rabbit_peer_discovery_consul:list_nodes()), ?assert(meck:validate(rabbit_peer_discovery_httpc)). @@ -388,7 +388,7 @@ list_nodes_return_value_basic_long_node_name_test(_Config) -> rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body)) end), meck:expect(rabbit_nodes, name_type, fun() -> longnames end), - ?assertEqual({ok, {['rabbit@rabbit1.node.consul', 'rabbit@rabbit2.node.consul'], disc}}, + ?assertEqual({ok, {'rabbit@rabbit2.node.consul', disc}}, rabbit_peer_discovery_consul:list_nodes()), ?assert(meck:validate(rabbit_peer_discovery_httpc)). @@ -419,7 +419,7 @@ list_nodes_return_value_long_node_name_and_custom_domain_test(_Config) -> meck:expect(rabbit_nodes, name_type, fun() -> longnames end), - ?assertEqual({ok, {['rabbit@rabbit1.node.internal', 'rabbit@rabbit2.node.internal'], disc}}, + ?assertEqual({ok, {'rabbit@rabbit2.node.internal', disc}}, rabbit_peer_discovery_consul:list_nodes()), ?assert(meck:validate(rabbit_peer_discovery_httpc)). @@ -446,7 +446,7 @@ list_nodes_return_value_srv_address_test(_Config) -> Body = "[{\"Node\": {\"Node\": \"rabbit2.internal.domain\", \"Address\": \"10.20.16.160\"}, \"Checks\": [{\"Node\": \"rabbit2.internal.domain\", \"CheckID\": \"service:rabbitmq\", \"Name\": \"Service \'rabbitmq\' check\", \"ServiceName\": \"rabbitmq\", \"Notes\": \"Connect to the port internally every 30 seconds\", \"Status\": \"passing\", \"ServiceID\": \"rabbitmq:172.172.16.4.50\", \"Output\": \"\"}, {\"Node\": \"rabbit2.internal.domain\", \"CheckID\": \"serfHealth\", \"Name\": \"Serf Health Status\", \"ServiceName\": \"\", \"Notes\": \"\", \"Status\": \"passing\", \"ServiceID\": \"\", \"Output\": \"Agent alive and reachable\"}], \"Service\": {\"Address\": \"172.16.4.51\", \"Port\": 5672, \"ID\": \"rabbitmq:172.16.4.51\", \"Service\": \"rabbitmq\", \"Tags\": [\"amqp\"]}}, {\"Node\": {\"Node\": \"rabbit1.internal.domain\", \"Address\": \"10.20.16.159\"}, \"Checks\": [{\"Node\": \"rabbit1.internal.domain\", \"CheckID\": \"service:rabbitmq\", \"Name\": \"Service \'rabbitmq\' check\", \"ServiceName\": \"rabbitmq\", \"Notes\": \"Connect to the port internally every 30 seconds\", \"Status\": \"passing\", \"ServiceID\": \"rabbitmq\", \"Output\": \"\"}, {\"Node\": \"rabbit1.internal.domain\", \"CheckID\": \"serfHealth\", \"Name\": \"Serf Health Status\", \"ServiceName\": \"\", \"Notes\": \"\", \"Status\": \"passing\", \"ServiceID\": \"\", \"Output\": \"Agent alive and reachable\"}], \"Service\": {\"Address\": \"172.172.16.51\", \"Port\": 5672, \"ID\": \"rabbitmq:172.172.16.51\", \"Service\": \"rabbitmq\", \"Tags\": [\"amqp\"]}}]", rabbit_json:try_decode(rabbit_data_coercion:to_binary(Body)) end), - ?assertEqual({ok, {['rabbit@172.16.4.51', 'rabbit@172.172.16.51'], disc}}, + ?assertEqual({ok, {'rabbit@172.16.4.51', disc}}, rabbit_peer_discovery_consul:list_nodes()), ?assert(meck:validate(rabbit_peer_discovery_httpc)). @@ -475,7 +475,7 @@ list_nodes_return_value_nodes_in_warning_state_included_test(_Config) -> rabbit_json:try_decode(list_of_nodes_without_warnings()) end), os:putenv("CONSUL_INCLUDE_NODES_WITH_WARNINGS", "true"), - ?assertEqual({ok, {['rabbit@172.16.4.51'], disc}}, + ?assertEqual({ok, {'rabbit@172.16.4.51', disc}}, rabbit_peer_discovery_consul:list_nodes()), ?assert(meck:validate(rabbit_peer_discovery_httpc)). @@ -504,7 +504,7 @@ list_nodes_return_value_nodes_in_warning_state_filtered_out_test(_Config) -> rabbit_json:try_decode(list_of_nodes_without_warnings()) end), os:putenv("CONSUL_INCLUDE_NODES_WITH_WARNINGS", "false"), - ?assertEqual({ok, {['rabbit@172.16.4.51', 'rabbit@172.172.16.51'], disc}}, + ?assertEqual({ok, {'rabbit@172.16.4.51', disc}}, rabbit_peer_discovery_consul:list_nodes()), ?assert(meck:validate(rabbit_peer_discovery_httpc)).