Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_db_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ can_join(RemoteNode) ->
"DB: checking if `~ts` can join cluster using remote node `~ts`",
[node(), RemoteNode],
#{domain => ?RMQLOG_DOMAIN_DB}),
case rabbit_feature_flags:check_node_compatibility(RemoteNode) of
case rabbit_feature_flags:check_node_compatibility(RemoteNode, true) of
ok ->
case rabbit_khepri:is_enabled(RemoteNode) of
true -> can_join_using_khepri(RemoteNode);
Expand Down
41 changes: 36 additions & 5 deletions deps/rabbit/src/rabbit_feature_flags.erl
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@
init/0,
get_state/1,
get_stability/1,
check_node_compatibility/1,
check_node_compatibility/1, check_node_compatibility/2,
sync_feature_flags_with_cluster/2,
refresh_feature_flags_after_app_load/0,
enabled_feature_flags_list_file/0
Expand Down Expand Up @@ -1302,7 +1302,9 @@ does_node_support(Node, FeatureNames, Timeout) ->
false
end.

-spec check_node_compatibility(node()) -> ok | {error, any()}.
-spec check_node_compatibility(RemoteNode) -> Ret when
RemoteNode :: node(),
Ret :: ok | {error, any()}.
%% @doc
%% Checks if a node is compatible with the local node.
%%
Expand All @@ -1314,11 +1316,40 @@ does_node_support(Node, FeatureNames, Timeout) ->
%% local node</li>
%% </ol>
%%
%% @param Node the name of the remote node to test.
%% @param RemoteNode the name of the remote node to test.
%% @returns `ok' if they are compatible, `{error, Reason}' if they are not.

check_node_compatibility(RemoteNode) ->
check_node_compatibility(RemoteNode, false).

-spec check_node_compatibility(RemoteNode, LocalNodeAsVirgin) -> Ret when
RemoteNode :: node(),
LocalNodeAsVirgin :: boolean(),
Ret :: ok | {error, any()}.
%% @doc
%% Checks if a node is compatible with the local node.
%%
%% To be compatible, the following two conditions must be met:
%% <ol>
%% <li>feature flags enabled on the local node must be supported by the
%% remote node</li>
%% <li>feature flags enabled on the remote node must be supported by the
%% local node</li>
%% </ol>
%%
%% Unlike {@link check_node_compatibility/1}, the local node's feature flags
%% inventory is evaluated as if the node was virgin if `LocalNodeAsVirgin' is
%% true. This is useful if the local node will be reset as part of joining a
%% remote cluster for instance.
%%
%% @param RemoteNode the name of the remote node to test.
%% @param LocalNodeAsVirgin flag to indicate if the local node should be
%% evaluated as if it was virgin.
%% @returns `ok' if they are compatible, `{error, Reason}' if they are not.

check_node_compatibility(Node) ->
rabbit_ff_controller:check_node_compatibility(Node).
check_node_compatibility(RemoteNode, LocalNodeAsVirgin) ->
rabbit_ff_controller:check_node_compatibility(
RemoteNode, LocalNodeAsVirgin).

run_feature_flags_mod_on_remote_node(Node, Function, Args, Timeout) ->
rabbit_ff_controller:rpc_call(Node, ?MODULE, Function, Args, Timeout).
Expand Down
77 changes: 64 additions & 13 deletions deps/rabbit/src/rabbit_ff_controller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
-export([is_supported/1, is_supported/2,
enable/1,
enable_default/0,
check_node_compatibility/1,
check_node_compatibility/2,
sync_cluster/1,
refresh_after_app_load/0,
get_forced_feature_flag_names/0]).
Expand Down Expand Up @@ -134,20 +134,30 @@ enable_default() ->
Ret
end.

check_node_compatibility(RemoteNode) ->
check_node_compatibility(RemoteNode, LocalNodeAsVirgin) ->
ThisNode = node(),
?LOG_DEBUG(
"Feature flags: CHECKING COMPATIBILITY between nodes `~ts` and `~ts`",
[ThisNode, RemoteNode],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS}),
case LocalNodeAsVirgin of
true ->
?LOG_DEBUG(
"Feature flags: CHECKING COMPATIBILITY between nodes `~ts` "
"and `~ts`; consider node `~ts` as virgin",
[ThisNode, RemoteNode, ThisNode],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS});
false ->
?LOG_DEBUG(
"Feature flags: CHECKING COMPATIBILITY between nodes `~ts` "
"and `~ts`",
[ThisNode, RemoteNode],
#{domain => ?RMQLOG_DOMAIN_FEAT_FLAGS})
end,
%% We don't go through the controller process to check nodes compatibility
%% because this function is used while `rabbit' is stopped usually.
%%
%% There is no benefit in starting a controller just for this check
%% because it would not guaranty that the compatibility remains true after
%% this function finishes and before the node starts and synchronizes
%% feature flags.
check_node_compatibility_task(ThisNode, RemoteNode).
check_node_compatibility_task(ThisNode, RemoteNode, LocalNodeAsVirgin).

sync_cluster(Nodes) ->
?LOG_DEBUG(
Expand Down Expand Up @@ -382,12 +392,14 @@ notify_waiting_controller({ControlerPid, _} = From) ->
%% Code to check compatibility between nodes.
%% --------------------------------------------------------------------

-spec check_node_compatibility_task(Node, Node) -> Ret when
Node :: node(),
-spec check_node_compatibility_task(NodeA, NodeB, NodeAAsVirigin) -> Ret when
NodeA :: node(),
NodeB :: node(),
NodeAAsVirigin :: boolean(),
Ret :: ok | {error, Reason},
Reason :: incompatible_feature_flags.

check_node_compatibility_task(NodeA, NodeB) ->
check_node_compatibility_task(NodeA, NodeB, NodeAAsVirigin) ->
?LOG_NOTICE(
"Feature flags: checking nodes `~ts` and `~ts` compatibility...",
[NodeA, NodeB],
Expand All @@ -400,7 +412,8 @@ check_node_compatibility_task(NodeA, NodeB) ->
_ when is_list(NodesB) ->
check_node_compatibility_task1(
NodeA, NodesA,
NodeB, NodesB);
NodeB, NodesB,
NodeAAsVirigin);
Error ->
?LOG_WARNING(
"Feature flags: "
Expand All @@ -419,10 +432,12 @@ check_node_compatibility_task(NodeA, NodeB) ->
{error, {aborted_feature_flags_compat_check, Error}}
end.

check_node_compatibility_task1(NodeA, NodesA, NodeB, NodesB)
check_node_compatibility_task1(NodeA, NodesA, NodeB, NodesB, NodeAAsVirigin)
when is_list(NodesA) andalso is_list(NodesB) ->
case collect_inventory_on_nodes(NodesA) of
{ok, InventoryA} ->
{ok, InventoryA0} ->
InventoryA = virtually_reset_inventory(
InventoryA0, NodeAAsVirigin),
?LOG_DEBUG(
"Feature flags: inventory of node `~ts`:~n~tp",
[NodeA, InventoryA],
Expand Down Expand Up @@ -488,6 +503,42 @@ list_nodes_clustered_with(Node) ->
ListOrError -> ListOrError
end.

virtually_reset_inventory(
#{feature_flags := FeatureFlags,
states_per_node := StatesPerNode} = Inventory,
true = _NodeAsVirgin) ->
[Node | _] = maps:keys(StatesPerNode),
FeatureStates0 = maps:get(Node, StatesPerNode),
FeatureStates1 = maps:map(
fun(FeatureName, _FeatureState) ->
FeatureProps = maps:get(
FeatureName, FeatureFlags),
state_after_virtual_state(
FeatureName, FeatureProps)
end, FeatureStates0),
StatesPerNode1 = maps:map(
fun(_Node, _FeatureStates) ->
FeatureStates1
end, StatesPerNode),
Inventory1 = Inventory#{states_per_node => StatesPerNode1},
Inventory1;
virtually_reset_inventory(
Inventory,
false = _NodeAsVirgin) ->
Inventory.

state_after_virtual_state(_FeatureName, FeatureProps)
when ?IS_FEATURE_FLAG(FeatureProps) ->
Stability = rabbit_feature_flags:get_stability(FeatureProps),
case Stability of
required -> true;
_ -> false
end;
state_after_virtual_state(FeatureName, FeatureProps)
when ?IS_DEPRECATION(FeatureProps) ->
not rabbit_deprecated_features:should_be_permitted(
FeatureName, FeatureProps).

-spec are_compatible(Inventory, Inventory) -> AreCompatible when
Inventory :: rabbit_feature_flags:cluster_inventory(),
AreCompatible :: boolean().
Expand Down
5 changes: 1 addition & 4 deletions deps/rabbit/src/rabbit_khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -871,10 +871,7 @@ check_cluster_consistency(Node, CheckNodesConsistency) ->
Error
end;
{_OTP, _Rabbit, {ok, Status}} ->
case rabbit_db_cluster:check_compatibility(Node) of
ok -> {ok, Status};
Error -> Error
end
{ok, Status}
end.

remote_node_info(Node) ->
Expand Down
41 changes: 35 additions & 6 deletions deps/rabbit/src/rabbit_mnesia.erl
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,24 @@ cluster_nodes(WhichNodes) -> cluster_status(WhichNodes).
cluster_status_from_mnesia() ->
case is_running() of
false ->
{error, mnesia_not_running};
case rabbit_khepri:get_feature_state() of
enabled ->
%% To keep this API compatible with older remote nodes who
%% don't know about Khepri, we take the cluster status
%% from `rabbit_khepri' and reformat the return value to
%% ressemble the node from this module.
%%
%% Both nodes won't be compatible, but let's leave that
%% decision to the Feature flags subsystem.
case rabbit_khepri:cluster_status_from_khepri() of
{ok, {All, Running}} ->
{ok, {All, All, Running}};
{error, _} = Error ->
Error
end;
_ ->
{error, mnesia_not_running}
end;
true ->
%% If the tables are not present, it means that
%% `init_db/3' hasn't been run yet. In other words, either
Expand Down Expand Up @@ -475,8 +492,23 @@ members() ->
end.

node_info() ->
%% Once Khepri is enabled, the Mnesia protocol is irrelevant obviously.
%%
%% That said, older remote nodes who don't known about Khepri will request
%% this information anyway as part of calling `node_info/0'. Here, we
%% simply return `unsupported' as the Mnesia protocol. Older versions of
%% RabbitMQ will skip the protocol negotiation and use other ways.
%%
%% The goal is mostly to let older nodes which check Mnesia before feature
%% flags to reach the feature flags check. This one will correctly
%% indicate that they are incompatible. That's why we return `unsupported'
%% here, even if we could return the actual Mnesia protocol.
MnesiaProtocol = case rabbit_khepri:get_feature_state() of
enabled -> unsupported;
_ -> mnesia:system_info(protocol_version)
end,
{rabbit_misc:otp_release(), rabbit_misc:version(),
mnesia:system_info(protocol_version),
MnesiaProtocol,
cluster_status_from_mnesia()}.

-spec node_type() -> rabbit_db_cluster:node_type().
Expand Down Expand Up @@ -694,10 +726,7 @@ check_cluster_consistency(Node, CheckNodesConsistency) ->
Error
end;
{_OTP, _Rabbit, _Protocol, {ok, Status}} ->
case rabbit_db_cluster:check_compatibility(Node) of
ok -> {ok, Status};
Error -> Error
end
{ok, Status}
end.

remote_node_info(Node) ->
Expand Down
49 changes: 49 additions & 0 deletions deps/rabbit/test/feature_flags_v2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
failed_enable_feature_flag_with_post_enable/1,
have_required_feature_flag_in_cluster_and_add_member_with_it_disabled/1,
have_required_feature_flag_in_cluster_and_add_member_without_it/1,
have_unknown_feature_flag_in_cluster_and_add_member_with_it_enabled/1,
error_during_migration_after_initial_success/1,
controller_waits_for_own_task_to_finish_before_exiting/1,
controller_waits_for_remote_task_to_finish_before_exiting/1
Expand Down Expand Up @@ -98,6 +99,7 @@ groups() ->
failed_enable_feature_flag_with_post_enable,
have_required_feature_flag_in_cluster_and_add_member_with_it_disabled,
have_required_feature_flag_in_cluster_and_add_member_without_it,
have_unknown_feature_flag_in_cluster_and_add_member_with_it_enabled,
error_during_migration_after_initial_success,
controller_waits_for_own_task_to_finish_before_exiting,
controller_waits_for_remote_task_to_finish_before_exiting
Expand Down Expand Up @@ -1506,6 +1508,53 @@ have_required_feature_flag_in_cluster_and_add_member_without_it(
|| Node <- AllNodes],
ok.

have_unknown_feature_flag_in_cluster_and_add_member_with_it_enabled(
Config) ->
[NewNode | [FirstNode | _] = Nodes] = ?config(nodes, Config),
connect_nodes(Nodes),
override_running_nodes([NewNode]),
override_running_nodes(Nodes),

FeatureName = ?FUNCTION_NAME,
FeatureFlags = #{FeatureName =>
#{provided_by => rabbit,
stability => stable}},
?assertEqual(ok, inject_on_nodes([NewNode], FeatureFlags)),

ct:pal(
"Checking the feature flag is unsupported on the cluster but enabled on "
"the standalone node"),
ok = run_on_node(
NewNode,
fun() ->
?assertEqual(ok, rabbit_feature_flags:enable(FeatureName)),
?assert(rabbit_feature_flags:is_enabled(FeatureName)),
ok
end,
[]),
_ = [ok =
run_on_node(
Node,
fun() ->
?assertNot(rabbit_feature_flags:is_supported(FeatureName)),
?assertNot(rabbit_feature_flags:is_enabled(FeatureName)),
ok
end,
[])
|| Node <- Nodes],

%% Check compatibility between NewNodes and Nodes.
ok = run_on_node(
NewNode,
fun() ->
?assertEqual(
ok,
rabbit_feature_flags:check_node_compatibility(
FirstNode, true)),
ok
end, []),
ok.

error_during_migration_after_initial_success(Config) ->
AllNodes = [NewNode | [FirstNode | _] = Nodes] = ?config(nodes, Config),
connect_nodes(Nodes),
Expand Down
Loading