From b5700751cfdc0864f14eed9eb04e78ed9f68f859 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Fri, 10 Sep 2021 15:14:09 +0200 Subject: [PATCH] Add HTTP endpoint for stream publishers References #3389 --- .../priv/www/js/dispatcher.js | 2 +- .../priv/www/js/stream.js | 14 +++-- .../src/rabbit_stream_mgmt_db.erl | 31 ++++++++++- .../src/rabbit_stream_publishers_mgmt.erl | 43 ++++++++++++---- .../test/http_SUITE.erl | 16 +++--- .../java/com/rabbitmq/stream/HttpTest.java | 51 ++++++++++++++++++- 6 files changed, 133 insertions(+), 24 deletions(-) diff --git a/deps/rabbitmq_management/priv/www/js/dispatcher.js b/deps/rabbitmq_management/priv/www/js/dispatcher.js index 718808368b84..feb1974c3c9c 100644 --- a/deps/rabbitmq_management/priv/www/js/dispatcher.js +++ b/deps/rabbitmq_management/priv/www/js/dispatcher.js @@ -109,7 +109,7 @@ dispatcher_add(function(sammy) { var extra = QUEUE_EXTRA_CONTENT_REQUESTS[i](vhost, queue); for (key in extra) { if(extra.hasOwnProperty(key)){ - requests[key] = "/stream/publishers"; + requests[key] = extra[key]; } } } diff --git a/deps/rabbitmq_stream_management/priv/www/js/stream.js b/deps/rabbitmq_stream_management/priv/www/js/stream.js index 0179f019f9a6..ac546ad51b10 100644 --- a/deps/rabbitmq_stream_management/priv/www/js/stream.js +++ b/deps/rabbitmq_stream_management/priv/www/js/stream.js @@ -64,14 +64,20 @@ CONSUMER_OWNER_FORMATTERS.push({ CONSUMER_OWNER_FORMATTERS.sort(CONSUMER_OWNER_FORMATTERS_COMPARATOR); QUEUE_EXTRA_CONTENT_REQUESTS.push(function(vhost, queue) { - return {'extra_stream_publishers' : '/stream/publishers'}; + return {'extra_stream_publishers' : '/stream/publishers/' + esc(vhost) + '/' + esc(queue)}; }); QUEUE_EXTRA_CONTENT.push(function(queue, extraContent) { if (is_stream(queue)) { - return '

Stream publishers

' + - format('streamPublishersList', {'publishers': extraContent['extra_stream_publishers']}) + - '
'; + var publishers = extraContent['extra_stream_publishers']; + if (publishers !== undefined) { + return '

Stream publishers

' + + format('streamPublishersList', {'publishers': publishers}) + + '
'; + + } else { + return ''; + } } else { return ''; } diff --git a/deps/rabbitmq_stream_management/src/rabbit_stream_mgmt_db.erl b/deps/rabbitmq_stream_management/src/rabbit_stream_mgmt_db.erl index bbef5b0ac917..b5ac9fe98797 100644 --- a/deps/rabbitmq_stream_management/src/rabbit_stream_mgmt_db.erl +++ b/deps/rabbitmq_stream_management/src/rabbit_stream_mgmt_db.erl @@ -17,7 +17,8 @@ get_all_publishers/1]). -export([entity_data/4]). -export([get_connection_consumers/1, - get_connection_publishers/1]). + get_connection_publishers/1, + get_stream_publishers/1]). get_all_consumers(VHosts) -> rabbit_mgmt_db:submit(fun(_Interval) -> consumers_stats(VHosts) end). @@ -35,6 +36,11 @@ get_connection_publishers(ConnectionPid) when is_pid(ConnectionPid) -> connection_publishers_stats(ConnectionPid) end). +get_stream_publishers(QueueResource) -> + rabbit_mgmt_db:submit(fun(_Interval) -> + stream_publishers_stats(QueueResource) + end). + consumers_stats(VHost) -> Data = rabbit_mgmt_db:get_data_from_nodes({rabbit_stream_mgmt_db, @@ -67,6 +73,14 @@ connection_publishers_stats(ConnectionPid) -> fun publishers_by_connection/1]}), [V || {_, V} <- maps:to_list(Data)]. +stream_publishers_stats(Queue) -> + Data = + rabbit_mgmt_db:get_data_from_nodes({rabbit_stream_mgmt_db, + entity_data, + [Queue, ?ENTITY_PUBLISHER, + fun publishers_by_stream/1]}), + [V || {_, V} <- maps:to_list(Data)]. + entity_data(_Pid, Param, EntityType, QueryFun) -> maps:from_list([begin AugmentedPublisher = augment_entity(EntityType, P), @@ -103,12 +117,27 @@ consumers_by_connection(ConnectionPid) -> publishers_by_connection(ConnectionPid) -> get_entity_stats(?TABLE_PUBLISHER, ConnectionPid). +publishers_by_stream(QueueResource) -> + get_entity_stats_by_resource(?TABLE_PUBLISHER, QueueResource). + get_entity_stats(Table, Id) -> ets:select(Table, match_entity_spec(Id)). match_entity_spec(ConnectionId) -> [{{{'_', '$1', '_'}, '_'}, [{'==', ConnectionId, '$1'}], ['$_']}]. +get_entity_stats_by_resource(Table, Resource) -> + ets:select(Table, match_entity_spec_by_resource(Resource)). + +match_entity_spec_by_resource(#resource{virtual_host = VHost, + name = Name}) -> + [{{{#resource{virtual_host = '$1', + name = '$2', + _ = '_'}, + '_', '_'}, + '_'}, + [{'andalso', {'==', '$1', VHost}, {'==', Name, '$2'}}], ['$_']}]. + augment_connection_pid(Consumer) -> Pid = rabbit_misc:pget(connection, Consumer), Conn = diff --git a/deps/rabbitmq_stream_management/src/rabbit_stream_publishers_mgmt.erl b/deps/rabbitmq_stream_management/src/rabbit_stream_publishers_mgmt.erl index 7899cd9c9814..567fa55cf84e 100644 --- a/deps/rabbitmq_stream_management/src/rabbit_stream_publishers_mgmt.erl +++ b/deps/rabbitmq_stream_management/src/rabbit_stream_publishers_mgmt.erl @@ -22,7 +22,8 @@ dispatcher() -> [{"/stream/publishers", ?MODULE, []}, - {"/stream/publishers/:vhost", ?MODULE, []}]. + {"/stream/publishers/:vhost", ?MODULE, []}, + {"/stream/publishers/:vhost/:queue", ?MODULE, []}]. web_ui() -> []. @@ -42,22 +43,44 @@ resource_exists(ReqData, Context) -> none -> true; % none means `all` _ -> - true + case rabbit_mgmt_util:id(queue, ReqData) of + none -> + true; + _ -> + case rabbit_mgmt_wm_queue:queue(ReqData) of + not_found -> + false; + _ -> + true + end + end end, ReqData, Context}. to_json(ReqData, Context = #context{user = User}) -> case rabbit_mgmt_util:disable_stats(ReqData) of false -> - Arg = case rabbit_mgmt_util:vhost(ReqData) of - none -> - all; - VHost -> - VHost - end, + VHost = + case rabbit_mgmt_util:vhost(ReqData) of + none -> + all; + V -> + V + end, + Queue = rabbit_mgmt_util:id(queue, ReqData), Publishers = - rabbit_mgmt_format:strip_pids( - rabbit_stream_mgmt_db:get_all_publishers(Arg)), + case {VHost, Queue} of + {VHost, none} -> + rabbit_mgmt_format:strip_pids( + rabbit_stream_mgmt_db:get_all_publishers(VHost)); + {VHost, Q} -> + QueueResource = + #resource{virtual_host = VHost, + name = Q, + kind = queue}, + rabbit_mgmt_format:strip_pids( + rabbit_stream_mgmt_db:get_stream_publishers(QueueResource)) + end, rabbit_mgmt_util:reply_list(filter_user(Publishers, User), [], ReqData, diff --git a/deps/rabbitmq_stream_management/test/http_SUITE.erl b/deps/rabbitmq_stream_management/test/http_SUITE.erl index 9f6ce45e99ec..83f288c0c671 100644 --- a/deps/rabbitmq_stream_management/test/http_SUITE.erl +++ b/deps/rabbitmq_stream_management/test/http_SUITE.erl @@ -34,13 +34,15 @@ init_per_suite(Config) -> [{rmq_nodename_suffix, ?MODULE}]), Config2 = rabbit_ct_helpers:set_config(Config1, - {rabbitmq_ct_tls_verify, verify_none}), - SetupStep = fun(StepConfig) -> - rabbit_ct_helpers:merge_app_env(StepConfig, - {rabbit, - [{collect_statistics_interval, - 500}]}) - end, + {rabbitmq_ct_tls_verify, + verify_none}), + SetupStep = + fun(StepConfig) -> + rabbit_ct_helpers:merge_app_env(StepConfig, + {rabbit, + [{collect_statistics_interval, + 500}]}) + end, rabbit_ct_helpers:run_setup_steps(Config2, [SetupStep] ++ rabbit_ct_broker_helpers:setup_steps() diff --git a/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java b/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java index 360b8b9784e1..495c831b6ae2 100644 --- a/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java +++ b/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java @@ -393,6 +393,8 @@ void publishers() throws Exception { client.declarePublisher((byte) 0, null, stream); waitUntil(() -> request.call().size() == initialCount + 1); + assertThat(toMaps(get("/stream/publishers/%2F"))).hasSize(1); + assertThat(toMaps(get("/stream/publishers/vh1"))).isEmpty(); waitUntil(() -> entities(request.call(), client).size() == 1); Map publisher = entities(request.call(), client).get(0); @@ -427,6 +429,51 @@ void publishers() throws Exception { waitUntil(() -> entities(request.call(), client).isEmpty()); } + @Test + void publishersByStream() throws Exception { + Callable>> request = + () -> toMaps(get("/stream/publishers/%2F/" + stream)); + int initialCount = request.call().size(); + String connectionProvidedName = UUID.randomUUID().toString(); + AtomicBoolean closed = new AtomicBoolean(false); + Client client = + cf.get( + new ClientParameters() + .clientProperty("connection_name", connectionProvidedName) + .shutdownListener(shutdownContext -> closed.set(true))); + + String otherStream = UUID.randomUUID().toString(); + assertThat(client.create(otherStream).isOk()).isTrue(); + + client.declarePublisher((byte) 0, null, stream); + client.declarePublisher((byte) 1, null, otherStream); + + waitUntil(() -> toMaps(get("/stream/publishers/%2F")).size() == initialCount + 2); + waitUntil(() -> request.call().size() == initialCount + 1); + waitUntil(() -> entities(request.call(), client).size() == 1); + + Map publisher = entities(request.call(), client).get(0); + assertThat(connectionDetails(publisher)) + .containsEntry("name", connectionName(client)) + .containsEntry("user", "guest") + .containsKey("node"); + assertThat(queue(publisher)).containsEntry("name", stream).containsEntry("vhost", "/"); + + Callable>> requestOtherStream = + () -> toMaps(get("/stream/publishers/%2F/" + otherStream)); + waitUntil(() -> entities(requestOtherStream.call(), client).size() == 1); + + publisher = entities(requestOtherStream.call(), client).get(0); + assertThat(connectionDetails(publisher)) + .containsEntry("name", connectionName(client)) + .containsEntry("user", "guest") + .containsKey("node"); + assertThat(queue(publisher)).containsEntry("name", otherStream).containsEntry("vhost", "/"); + + client.deletePublisher((byte) 0); + client.deletePublisher((byte) 1); + } + @ParameterizedTest @ValueSource(strings = {"foo"}) @NullSource @@ -740,7 +787,9 @@ void permissions() throws Exception { "/stream/connections/%2F/foo-connection-name/consumers", "/stream/connections/%2F/foo-connection-name/publishers", "/stream/consumers/foo-virtual-host", - "/stream/publishers/foo-virtual-host" + "/stream/publishers/foo-virtual-host", + "/stream/publishers/foo-virtual-host", + "/stream/publishers/%2F/foo-stream" }) void shouldReturnNotFound(String endpoint) { assertThatThrownBy(() -> get(endpoint)).hasMessageContaining("404");