From e3430aa56dc5d88fb8ab2b6919c0353b64723fe2 Mon Sep 17 00:00:00 2001 From: Iliia Khaprov Date: Fri, 14 Feb 2025 11:16:51 +0100 Subject: [PATCH 1/5] RMQ-1263: Shovel: add forwarded counter Delayed queuese can automatically create associated Shovels to transfer Ready messages to the desired destination. This adds forwarded messages counter which will be used in Management UI for better Shovel internals visibility. (cherry picked from commit a8800b6cd75d8dc42a91f88655058f2ffa3b6ea6) --- ...Q.CLI.Ctl.Commands.DeleteShovelCommand.erl | 2 +- .../src/rabbit_amqp091_shovel.erl | 10 +++--- .../src/rabbit_shovel_behaviour.erl | 16 +++++++-- .../src/rabbit_shovel_status.erl | 34 +++++++++++++++---- deps/rabbitmq_shovel/test/amqp10_SUITE.erl | 6 ++-- .../test/configuration_SUITE.erl | 6 ++-- deps/rabbitmq_shovel/test/dynamic_SUITE.erl | 8 +++-- .../test/shovel_status_command_SUITE.erl | 6 ++-- .../test/shovel_test_utils.erl | 4 +-- .../src/rabbit_shovel_mgmt_util.erl | 2 +- 10 files changed, 66 insertions(+), 28 deletions(-) diff --git a/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteShovelCommand.erl b/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteShovelCommand.erl index 0529e6a207c1..6c8a03006512 100644 --- a/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteShovelCommand.erl +++ b/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DeleteShovelCommand.erl @@ -77,7 +77,7 @@ run([Name], #{node := Node, vhost := VHost}) -> try_force_removing(Node, VHost, Name, ActingUser), {error, rabbit_data_coercion:to_binary(ErrMsg)}; Match -> - {{_Name, _VHost}, _Type, {_State, Opts}, _Timestamp} = Match, + {{_Name, _VHost}, _Type, {_State, Opts}, _Metrics, _Timestamp} = Match, {_, HostingNode} = lists:keyfind(node, 1, Opts), case rabbit_misc:rpc_call( HostingNode, rabbit_shovel_util, delete_shovel, [VHost, Name, ActingUser]) of diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl index 1cc53f8d7f42..1740e7aad2a1 100644 --- a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl +++ b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl @@ -365,15 +365,17 @@ publish(IncomingTag, Method, Msg, ok = amqp_channel:call(OutboundChan, Method, Msg) end, + #{dest := Dst1} = State1 = rabbit_shovel_behaviour:incr_forwarded(State), + rabbit_shovel_behaviour:decr_remaining_unacked( case AckMode of no_ack -> - rabbit_shovel_behaviour:decr_remaining(1, State); + rabbit_shovel_behaviour:decr_remaining(1, State1); on_confirm -> - State#{dest => Dst#{unacked => Unacked#{Seq => IncomingTag}}}; + State1#{dest => Dst1#{unacked => Unacked#{Seq => IncomingTag}}}; on_publish -> - State1 = rabbit_shovel_behaviour:ack(IncomingTag, false, State), - rabbit_shovel_behaviour:decr_remaining(1, State1) + State2 = rabbit_shovel_behaviour:ack(IncomingTag, false, State1), + rabbit_shovel_behaviour:decr_remaining(1, State2) end). control_throttle(State) -> diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl index eef79060330c..67d092eaba3c 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl @@ -30,7 +30,8 @@ status/1, % common functions decr_remaining_unacked/1, - decr_remaining/2 + decr_remaining/2, + incr_forwarded/1 ]). -type tag() :: non_neg_integer(). @@ -155,7 +156,18 @@ nack(Tag, Multi, #{source := #{module := Mod}} = State) -> Mod:nack(Tag, Multi, State). status(#{dest := #{module := Mod}} = State) -> - Mod:status(State). + {Mod:status(State), metrics(State)}. + +incr_forwarded(State = #{dest := Dest}) -> + State#{dest => maps:put(forwarded, maps:get(forwarded, Dest, 0) + 1, Dest)}. + +metrics(_State = #{source := Source, + dest := Dest}) -> + #{remaining => maps:get(remaining, Source, unlimited), + remaining_unacked => maps:get(remaining_unacked, Source, 0), + pending => maps:get(pending, Dest, 0), + forwarded => maps:get(forwarded, Dest, 0)}. + %% Common functions diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_status.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_status.erl index 0612b6c07e26..75d35be1a393 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_status.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_status.erl @@ -49,6 +49,12 @@ info :: info(), blocked_status = running :: blocked_status(), blocked_at :: integer() | undefined, + metrics :: #{remaining := rabbit_types:option(non_neg_integer()) | unlimited, + ramaining_unacked := rabbit_types:option(non_neg_integer()), + pending := rabbit_types:option(non_neg_integer()), + forwarded := rabbit_types:option(non_neg_integer()) + }, + timestamp :: calendar:datetime()}). start_link() -> @@ -112,6 +118,7 @@ handle_call(status, _From, State) -> {reply, [{Entry#entry.name, Entry#entry.type, blocked_status_to_info(Entry), + Entry#entry.metrics, Entry#entry.timestamp} || Entry <- Entries], State}; @@ -120,6 +127,7 @@ handle_call({lookup, Name}, _From, State) -> [Entry] -> [{name, Name}, {type, Entry#entry.type}, {info, blocked_status_to_info(Entry)}, + {metrics, Entry#entry.metrics}, {timestamp, Entry#entry.timestamp}]; [] -> not_found end, @@ -141,6 +149,18 @@ handle_cast({report, Name, Type, Info, Timestamp}, State) -> split_name(Name) ++ split_status(Info)), {noreply, State}; +handle_cast({report_blocked_status, Name, {Status, Metrics}, Timestamp}, State) -> + case Status of + flow -> + true = ets:update_element(?ETS_NAME, Name, [{#entry.blocked_status, flow}, + {#entry.metrics, Metrics}, + {#entry.blocked_at, Timestamp}]); + _ -> + true = ets:update_element(?ETS_NAME, Name, [{#entry.blocked_status, Status}, + {#entry.metrics, Metrics}]) + end, + {noreply, State}; +%% used in tests handle_cast({report_blocked_status, Name, Status, Timestamp}, State) -> case Status of flow -> @@ -178,22 +198,22 @@ code_change(_OldVsn, State, _Extra) -> inject_node_info(Node, Shovels) -> lists:map( %% starting - fun({Name, Type, State, Timestamp}) when is_atom(State) -> + fun({Name, Type, State, Metrics, Timestamp}) when is_atom(State) -> Opts = [{node, Node}], - {Name, Type, {State, Opts}, Timestamp}; + {Name, Type, {State, Opts}, Metrics, Timestamp}; %% terminated - ({Name, Type, {terminated, Reason}, Timestamp}) -> - {Name, Type, {terminated, Reason}, Timestamp}; + ({Name, Type, {terminated, Reason}, Metrics, Timestamp}) -> + {Name, Type, {terminated, Reason}, Metrics, Timestamp}; %% running - ({Name, Type, {State, Opts}, Timestamp}) -> + ({Name, Type, {State, Opts}, Metrics, Timestamp}) -> Opts1 = Opts ++ [{node, Node}], - {Name, Type, {State, Opts1}, Timestamp} + {Name, Type, {State, Opts1}, Metrics, Timestamp} end, Shovels). -spec find_matching_shovel(rabbit_types:vhost(), binary(), [status_tuple()]) -> status_tuple() | undefined. find_matching_shovel(VHost, Name, Shovels) -> case lists:filter( - fun ({{V, S}, _Kind, _Status, _}) -> + fun ({{V, S}, _Kind, _Status, _Metrics, _}) -> VHost =:= V andalso Name =:= S end, Shovels) of [] -> undefined; diff --git a/deps/rabbitmq_shovel/test/amqp10_SUITE.erl b/deps/rabbitmq_shovel/test/amqp10_SUITE.erl index 5ecf53279c8d..937d37037cd3 100644 --- a/deps/rabbitmq_shovel/test/amqp10_SUITE.erl +++ b/deps/rabbitmq_shovel/test/amqp10_SUITE.erl @@ -139,7 +139,7 @@ amqp10_destination(Config, AckMode) -> throw(timeout_waiting_for_deliver1) end, - [{test_shovel, static, {running, _Info}, _Time}] = + [{test_shovel, static, {running, _Info}, _Metrics, _Time}] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, status, []), amqp10_client:detach_link(Receiver), @@ -183,7 +183,7 @@ amqp10_source(Config, AckMode) -> after ?TIMEOUT -> throw(timeout_waiting_for_deliver1) end, - [{test_shovel, static, {running, _Info}, _Time}] = + [{test_shovel, static, {running, _Info}, _Metrics, _Time}] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, status, []), rabbit_ct_client_helpers:close_channel(Chan). @@ -267,7 +267,7 @@ setup_shovel(ShovelConfig) -> await_running_shovel(test_shovel). await_running_shovel(Name) -> - case [N || {N, _, {running, _}, _} + case [N || {N, _, {running, _}, _, _} <- rabbit_shovel_status:status(), N =:= Name] of [_] -> ok; diff --git a/deps/rabbitmq_shovel/test/configuration_SUITE.erl b/deps/rabbitmq_shovel/test/configuration_SUITE.erl index a0f9385e955c..603243966fa5 100644 --- a/deps/rabbitmq_shovel/test/configuration_SUITE.erl +++ b/deps/rabbitmq_shovel/test/configuration_SUITE.erl @@ -277,7 +277,7 @@ run_valid_test(Config) -> after ?TIMEOUT -> throw(timeout_waiting_for_deliver1) end, - [{test_shovel, static, {running, _Info}, _Time}] = + [{test_shovel, static, {running, _Info}, _Metrics, _Time}] = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, status, []), @@ -407,7 +407,7 @@ setup_shovels2(Config) -> ok = application:start(rabbitmq_shovel). await_running_shovel(Name) -> - case [N || {N, _, {running, _}, _} + case [N || {N, _, {running, _}, _Metrics, _} <- rabbit_shovel_status:status(), N =:= Name] of [_] -> ok; @@ -415,7 +415,7 @@ await_running_shovel(Name) -> await_running_shovel(Name) end. await_terminated_shovel(Name) -> - case [N || {N, _, {terminated, _}, _} + case [N || {N, _, {terminated, _}, _Metrics, _} <- rabbit_shovel_status:status(), N =:= Name] of [_] -> ok; diff --git a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl index 554f25393fce..e6e21e02ddda 100644 --- a/deps/rabbitmq_shovel/test/dynamic_SUITE.erl +++ b/deps/rabbitmq_shovel/test/dynamic_SUITE.erl @@ -118,13 +118,17 @@ end_per_testcase(Testcase, Config) -> %% ------------------------------------------------------------------- simple(Config) -> + Name = <<"test">>, with_ch(Config, fun (Ch) -> shovel_test_utils:set_param( Config, - <<"test">>, [{<<"src-queue">>, <<"src">>}, + Name, [{<<"src-queue">>, <<"src">>}, {<<"dest-queue">>, <<"dest">>}]), - publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>) + publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>), + Status = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_shovel_status, lookup, [{<<"/">>, Name}]), + ?assertMatch([_|_], Status), + ?assertMatch(#{metrics := #{forwarded := 1}}, maps:from_list(Status)) end). quorum_queues(Config) -> diff --git a/deps/rabbitmq_shovel/test/shovel_status_command_SUITE.erl b/deps/rabbitmq_shovel/test/shovel_status_command_SUITE.erl index 26fc2aa6641d..a4bbbb29b958 100644 --- a/deps/rabbitmq_shovel/test/shovel_status_command_SUITE.erl +++ b/deps/rabbitmq_shovel/test/shovel_status_command_SUITE.erl @@ -82,11 +82,11 @@ run_starting(Config) -> [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Opts = #{node => A}, case ?CMD:run([], Opts) of - {stream, [{{<<"/">>, <<"test">>}, dynamic, starting, _}]} -> + {stream, [{{<<"/">>, <<"test">>}, dynamic, starting, _, _}]} -> ok; {stream, []} -> throw(shovel_not_found); - {stream, [{{<<"/">>, <<"test">>}, dynamic, {running, _}, _}]} -> + {stream, [{{<<"/">>, <<"test">>}, dynamic, {running, _}, _, _}]} -> ct:pal("Shovel is already running, starting could not be tested!") end, shovel_test_utils:clear_param(Config, <<"test">>). @@ -107,7 +107,7 @@ run_running(Config) -> {<<"dest-queue">>, <<"dest">>}]), [A] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Opts = #{node => A}, - {stream, [{{<<"/">>, <<"test">>}, dynamic, {running, _}, _}]} + {stream, [{{<<"/">>, <<"test">>}, dynamic, {running, _}, _, _}]} = ?CMD:run([], Opts), shovel_test_utils:clear_param(Config, <<"test">>). diff --git a/deps/rabbitmq_shovel/test/shovel_test_utils.erl b/deps/rabbitmq_shovel/test/shovel_test_utils.erl index 3107f2ecbcb2..ae18db01de3b 100644 --- a/deps/rabbitmq_shovel/test/shovel_test_utils.erl +++ b/deps/rabbitmq_shovel/test/shovel_test_utils.erl @@ -65,7 +65,7 @@ shovels_from_status() -> shovels_from_status(ExpectedState) -> S = rabbit_shovel_status:status(), - [N || {{<<"/">>, N}, dynamic, {State, _}, _} <- S, State == ExpectedState]. + [N || {{<<"/">>, N}, dynamic, {State, _}, _, _} <- S, State == ExpectedState]. get_shovel_status(Config, Name) -> get_shovel_status(Config, 0, Name). @@ -111,4 +111,4 @@ restart_shovel(Config, Name) -> restart_shovel(Config, Node, Name) -> rabbit_ct_broker_helpers:rpc(Config, - Node, rabbit_shovel_util, restart_shovel, [<<"/">>, Name]). \ No newline at end of file + Node, rabbit_shovel_util, restart_shovel, [<<"/">>, Name]). diff --git a/deps/rabbitmq_shovel_management/src/rabbit_shovel_mgmt_util.erl b/deps/rabbitmq_shovel_management/src/rabbit_shovel_mgmt_util.erl index b6f5a04c5f8b..154aed959ab8 100644 --- a/deps/rabbitmq_shovel_management/src/rabbit_shovel_mgmt_util.erl +++ b/deps/rabbitmq_shovel_management/src/rabbit_shovel_mgmt_util.erl @@ -42,7 +42,7 @@ status(Node) -> [format(Node, I) || I <- Status] end. -format(Node, {Name, Type, Info, TS}) -> +format(Node, {Name, Type, Info, Metrics, TS}) -> [{node, Node}, {timestamp, format_ts(TS)}] ++ format_name(Type, Name) ++ format_info(Info). From d4c1121c7751f562ef4f1d4a885617085b09ab90 Mon Sep 17 00:00:00 2001 From: Diana Parra Corbacho Date: Mon, 17 Mar 2025 15:35:22 +0100 Subject: [PATCH 2/5] RMQ-1263: dialyze, unused var (cherry picked from commit 68872f81074d378f76ffa44e7111e7979cdd8fd0) --- deps/rabbitmq_shovel_management/src/rabbit_shovel_mgmt_util.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbitmq_shovel_management/src/rabbit_shovel_mgmt_util.erl b/deps/rabbitmq_shovel_management/src/rabbit_shovel_mgmt_util.erl index 154aed959ab8..0b05bda1e55e 100644 --- a/deps/rabbitmq_shovel_management/src/rabbit_shovel_mgmt_util.erl +++ b/deps/rabbitmq_shovel_management/src/rabbit_shovel_mgmt_util.erl @@ -42,7 +42,7 @@ status(Node) -> [format(Node, I) || I <- Status] end. -format(Node, {Name, Type, Info, Metrics, TS}) -> +format(Node, {Name, Type, Info, _Metrics, TS}) -> [{node, Node}, {timestamp, format_ts(TS)}] ++ format_name(Type, Name) ++ format_info(Info). From c2569d26f293edfe856692d74e6925b5c9eb7627 Mon Sep 17 00:00:00 2001 From: Iliia Khaprov Date: Mon, 17 Mar 2025 21:36:43 +0100 Subject: [PATCH 3/5] RMQ-1263: Shovels forward counter - fix dialyzer (cherry picked from commit af22cf427a7054d93b3dd64fda01a86649fdd7c5) --- ...Q.CLI.Ctl.Commands.RestartShovelCommand.erl | 2 +- .../src/rabbit_shovel_behaviour.erl | 4 +++- .../src/rabbit_shovel_status.erl | 18 ++++++++++-------- .../src/rabbit_shovel_worker.erl | 4 ++-- 4 files changed, 16 insertions(+), 12 deletions(-) diff --git a/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartShovelCommand.erl b/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartShovelCommand.erl index a1b762bba9cf..c8be462176cc 100644 --- a/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartShovelCommand.erl +++ b/deps/rabbitmq_shovel/src/Elixir.RabbitMQ.CLI.Ctl.Commands.RestartShovelCommand.erl @@ -63,7 +63,7 @@ run([Name], #{node := Node, vhost := VHost}) -> undefined -> {error, rabbit_data_coercion:to_binary(ErrMsg)}; Match -> - {{_Name, _VHost}, _Type, {_State, Opts}, _Timestamp} = Match, + {{_Name, _VHost}, _Type, {_State, Opts}, _Metrics, _Timestamp} = Match, {_, HostingNode} = lists:keyfind(node, 1, Opts), case rabbit_misc:rpc_call( HostingNode, rabbit_shovel_util, restart_shovel, [VHost, Name]) of diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl index 67d092eaba3c..823dd481e9dc 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl @@ -83,7 +83,7 @@ -callback forward(Tag :: tag(), Props :: #{atom() => any()}, Payload :: binary(), state()) -> state() | {stop, any()}. --callback status(state()) -> rabbit_shovel_status:blocked_status() | ignore. +-callback status(state()) -> rabbit_shovel_status:shovel_status(). -spec parse(atom(), binary(), {source | destination, proplists:proplist()}) -> source_config() | dest_config(). @@ -155,12 +155,14 @@ ack(Tag, Multi, #{source := #{module := Mod}} = State) -> nack(Tag, Multi, #{source := #{module := Mod}} = State) -> Mod:nack(Tag, Multi, State). +-spec status(state()) -> {rabbit_shovel_status:shovel_status(), rabbit_shovel_status:metrics()}. status(#{dest := #{module := Mod}} = State) -> {Mod:status(State), metrics(State)}. incr_forwarded(State = #{dest := Dest}) -> State#{dest => maps:put(forwarded, maps:get(forwarded, Dest, 0) + 1, Dest)}. +-spec metrics(state()) -> rabbit_shovel_status:metrics(). metrics(_State = #{source := Source, dest := Dest}) -> #{remaining => maps:get(remaining, Source, unlimited), diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_status.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_status.erl index 75d35be1a393..e8b5800680b0 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_status.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_status.erl @@ -36,12 +36,18 @@ | {running, proplists:proplist()} | {terminated, term()}. -type blocked_status() :: running | flow | blocked. +-type shovel_status() :: blocked_status() | ignore. -type name() :: binary() | {rabbit_types:vhost(), binary()}. -type type() :: static | dynamic. --type status_tuple() :: {name(), type(), info(), calendar:datetime()}. +-type metrics() :: #{remaining := rabbit_types:option(non_neg_integer()) | unlimited, + remaining_unacked := rabbit_types:option(non_neg_integer()), + pending := rabbit_types:option(non_neg_integer()), + forwarded := rabbit_types:option(non_neg_integer()) + } | #{}. +-type status_tuple() :: {name(), type(), info(), metrics(), calendar:datetime()}. --export_type([info/0, blocked_status/0]). +-export_type([info/0, blocked_status/0, shovel_status/0, metrics/0]). -record(state, {timer}). -record(entry, {name :: name(), @@ -49,11 +55,7 @@ info :: info(), blocked_status = running :: blocked_status(), blocked_at :: integer() | undefined, - metrics :: #{remaining := rabbit_types:option(non_neg_integer()) | unlimited, - ramaining_unacked := rabbit_types:option(non_neg_integer()), - pending := rabbit_types:option(non_neg_integer()), - forwarded := rabbit_types:option(non_neg_integer()) - }, + metrics = #{} :: metrics(), timestamp :: calendar:datetime()}). @@ -64,7 +66,7 @@ start_link() -> report(Name, Type, Info) -> gen_server:cast(?SERVER, {report, Name, Type, Info, calendar:local_time()}). --spec report_blocked_status(name(), blocked_status()) -> ok. +-spec report_blocked_status(name(), {blocked_status(), metrics()} | blocked_status()) -> ok. report_blocked_status(Name, Status) -> gen_server:cast(?SERVER, {report_blocked_status, Name, Status, erlang:monotonic_time()}). diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl index 09d7aa38e720..541df58e1334 100644 --- a/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl +++ b/deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl @@ -21,7 +21,7 @@ -record(state, {name :: binary() | {rabbit_types:vhost(), binary()}, type :: static | dynamic, config :: rabbit_shovel_behaviour:state(), - last_reported_status = running :: rabbit_shovel_status:blocked_status()}). + last_reported_status = {running, #{}} :: {rabbit_shovel_status:blocked_status(), rabbit_shovel_status:metrics()}}). start_link(Type, Name, Config) -> ShovelParameter = rabbit_shovel_util:get_shovel_parameter(Name), @@ -224,7 +224,7 @@ human_readable_name(Name) -> maybe_report_blocked_status(#state{config = Config, last_reported_status = LastStatus} = State) -> case rabbit_shovel_behaviour:status(Config) of - ignore -> + {ignore, _} -> State; LastStatus -> State; From 63b58593609c4fd577ced4139c9c58792215de70 Mon Sep 17 00:00:00 2001 From: Iliia Khaprov Date: Wed, 26 Mar 2025 11:22:36 +0100 Subject: [PATCH 4/5] RMQ-1263: readd lost shovel_prometheus parts --- .../src/rabbit_shovel_prometheus_collector.erl | 4 ++-- .../test/prometheus_rabbitmq_shovel_collector_SUITE.erl | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/deps/rabbitmq_shovel_prometheus/src/rabbit_shovel_prometheus_collector.erl b/deps/rabbitmq_shovel_prometheus/src/rabbit_shovel_prometheus_collector.erl index 13ad734ac042..dbe2e2f97b56 100644 --- a/deps/rabbitmq_shovel_prometheus/src/rabbit_shovel_prometheus_collector.erl +++ b/deps/rabbitmq_shovel_prometheus/src/rabbit_shovel_prometheus_collector.erl @@ -29,9 +29,9 @@ deregister_cleanup(_) -> ok. collect_mf(_Registry, Callback) -> Status = rabbit_shovel_status:status(500), - {StaticStatusGroups, DynamicStatusGroups} = lists:foldl(fun({_,static,{S, _}, _}, {SMap, DMap}) -> + {StaticStatusGroups, DynamicStatusGroups} = lists:foldl(fun({_,static,{S, _}, _, _}, {SMap, DMap}) -> {maps:update_with(S, fun(C) -> C + 1 end, 1, SMap), DMap}; - ({_,dynamic,{S, _}, _}, {SMap, DMap}) -> + ({_,dynamic,{S, _}, _, _}, {SMap, DMap}) -> {SMap, maps:update_with(S, fun(C) -> C + 1 end, 1, DMap)} end, {#{}, #{}}, Status), diff --git a/deps/rabbitmq_shovel_prometheus/test/prometheus_rabbitmq_shovel_collector_SUITE.erl b/deps/rabbitmq_shovel_prometheus/test/prometheus_rabbitmq_shovel_collector_SUITE.erl index 495f23e24cb5..10ca7cd17c52 100644 --- a/deps/rabbitmq_shovel_prometheus/test/prometheus_rabbitmq_shovel_collector_SUITE.erl +++ b/deps/rabbitmq_shovel_prometheus/test/prometheus_rabbitmq_shovel_collector_SUITE.erl @@ -226,10 +226,10 @@ await_shovel(Name, Type) -> shovels_from_status(ExpectedState, dynamic) -> S = rabbit_shovel_status:status(), - [N || {{<<"/">>, N}, dynamic, {State, _}, _} <- S, State == ExpectedState]; + [N || {{<<"/">>, N}, dynamic, {State, _}, _, _} <- S, State == ExpectedState]; shovels_from_status(ExpectedState, static) -> S = rabbit_shovel_status:status(), - [N || {N, static, {State, _}, _} <- S, State == ExpectedState]. + [N || {N, static, {State, _}, _, _} <- S, State == ExpectedState]. get_shovel_status(Config, Name) -> get_shovel_status(Config, 0, Name). From 1f20543ee3838a80dc4c40079cf017b4a75746a4 Mon Sep 17 00:00:00 2001 From: Iliia Khaprov Date: Wed, 26 Mar 2025 18:03:20 +0100 Subject: [PATCH 5/5] RMQ-1263: shovel forward counter: try fixing rolling_upgrade_SUITE --- deps/rabbitmq_shovel/test/shovel_test_utils.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/deps/rabbitmq_shovel/test/shovel_test_utils.erl b/deps/rabbitmq_shovel/test/shovel_test_utils.erl index ae18db01de3b..b3593c4d9984 100644 --- a/deps/rabbitmq_shovel/test/shovel_test_utils.erl +++ b/deps/rabbitmq_shovel/test/shovel_test_utils.erl @@ -65,7 +65,8 @@ shovels_from_status() -> shovels_from_status(ExpectedState) -> S = rabbit_shovel_status:status(), - [N || {{<<"/">>, N}, dynamic, {State, _}, _, _} <- S, State == ExpectedState]. + [N || {{<<"/">>, N}, dynamic, {State, _}, _, _} <- S, State == ExpectedState] ++ + [N || {{<<"/">>, N}, dynamic, {State, _}, _} <- S, State == ExpectedState]. get_shovel_status(Config, Name) -> get_shovel_status(Config, 0, Name).