Skip to content

Commit

Permalink
Add HTTP endpoint for stream publishers
Browse files Browse the repository at this point in the history
References #3389
  • Loading branch information
acogoluegnes committed Sep 10, 2021
1 parent 0038573 commit b570075
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 24 deletions.
2 changes: 1 addition & 1 deletion deps/rabbitmq_management/priv/www/js/dispatcher.js
Expand Up @@ -109,7 +109,7 @@ dispatcher_add(function(sammy) {
var extra = QUEUE_EXTRA_CONTENT_REQUESTS[i](vhost, queue);
for (key in extra) {
if(extra.hasOwnProperty(key)){
requests[key] = "/stream/publishers";
requests[key] = extra[key];
}
}
}
Expand Down
14 changes: 10 additions & 4 deletions deps/rabbitmq_stream_management/priv/www/js/stream.js
Expand Up @@ -64,14 +64,20 @@ CONSUMER_OWNER_FORMATTERS.push({
CONSUMER_OWNER_FORMATTERS.sort(CONSUMER_OWNER_FORMATTERS_COMPARATOR);

QUEUE_EXTRA_CONTENT_REQUESTS.push(function(vhost, queue) {
return {'extra_stream_publishers' : '/stream/publishers'};
return {'extra_stream_publishers' : '/stream/publishers/' + esc(vhost) + '/' + esc(queue)};
});

QUEUE_EXTRA_CONTENT.push(function(queue, extraContent) {
if (is_stream(queue)) {
return '<div class="section"><h2>Stream publishers</h2><div class="hider updatable">' +
format('streamPublishersList', {'publishers': extraContent['extra_stream_publishers']}) +
'</div></div>';
var publishers = extraContent['extra_stream_publishers'];
if (publishers !== undefined) {
return '<div class="section"><h2>Stream publishers</h2><div class="hider updatable">' +
format('streamPublishersList', {'publishers': publishers}) +
'</div></div>';

} else {
return '';
}
} else {
return '';
}
Expand Down
31 changes: 30 additions & 1 deletion deps/rabbitmq_stream_management/src/rabbit_stream_mgmt_db.erl
Expand Up @@ -17,7 +17,8 @@
get_all_publishers/1]).
-export([entity_data/4]).
-export([get_connection_consumers/1,
get_connection_publishers/1]).
get_connection_publishers/1,
get_stream_publishers/1]).

get_all_consumers(VHosts) ->
rabbit_mgmt_db:submit(fun(_Interval) -> consumers_stats(VHosts) end).
Expand All @@ -35,6 +36,11 @@ get_connection_publishers(ConnectionPid) when is_pid(ConnectionPid) ->
connection_publishers_stats(ConnectionPid)
end).

get_stream_publishers(QueueResource) ->
rabbit_mgmt_db:submit(fun(_Interval) ->
stream_publishers_stats(QueueResource)
end).

consumers_stats(VHost) ->
Data =
rabbit_mgmt_db:get_data_from_nodes({rabbit_stream_mgmt_db,
Expand Down Expand Up @@ -67,6 +73,14 @@ connection_publishers_stats(ConnectionPid) ->
fun publishers_by_connection/1]}),
[V || {_, V} <- maps:to_list(Data)].

stream_publishers_stats(Queue) ->
Data =
rabbit_mgmt_db:get_data_from_nodes({rabbit_stream_mgmt_db,
entity_data,
[Queue, ?ENTITY_PUBLISHER,
fun publishers_by_stream/1]}),
[V || {_, V} <- maps:to_list(Data)].

entity_data(_Pid, Param, EntityType, QueryFun) ->
maps:from_list([begin
AugmentedPublisher = augment_entity(EntityType, P),
Expand Down Expand Up @@ -103,12 +117,27 @@ consumers_by_connection(ConnectionPid) ->
publishers_by_connection(ConnectionPid) ->
get_entity_stats(?TABLE_PUBLISHER, ConnectionPid).

publishers_by_stream(QueueResource) ->
get_entity_stats_by_resource(?TABLE_PUBLISHER, QueueResource).

get_entity_stats(Table, Id) ->
ets:select(Table, match_entity_spec(Id)).

match_entity_spec(ConnectionId) ->
[{{{'_', '$1', '_'}, '_'}, [{'==', ConnectionId, '$1'}], ['$_']}].

get_entity_stats_by_resource(Table, Resource) ->
ets:select(Table, match_entity_spec_by_resource(Resource)).

match_entity_spec_by_resource(#resource{virtual_host = VHost,
name = Name}) ->
[{{{#resource{virtual_host = '$1',
name = '$2',
_ = '_'},
'_', '_'},
'_'},
[{'andalso', {'==', '$1', VHost}, {'==', Name, '$2'}}], ['$_']}].

augment_connection_pid(Consumer) ->
Pid = rabbit_misc:pget(connection, Consumer),
Conn =
Expand Down
Expand Up @@ -22,7 +22,8 @@

dispatcher() ->
[{"/stream/publishers", ?MODULE, []},
{"/stream/publishers/:vhost", ?MODULE, []}].
{"/stream/publishers/:vhost", ?MODULE, []},
{"/stream/publishers/:vhost/:queue", ?MODULE, []}].

web_ui() ->
[].
Expand All @@ -42,22 +43,44 @@ resource_exists(ReqData, Context) ->
none ->
true; % none means `all`
_ ->
true
case rabbit_mgmt_util:id(queue, ReqData) of
none ->
true;
_ ->
case rabbit_mgmt_wm_queue:queue(ReqData) of
not_found ->
false;
_ ->
true
end
end
end,
ReqData, Context}.

to_json(ReqData, Context = #context{user = User}) ->
case rabbit_mgmt_util:disable_stats(ReqData) of
false ->
Arg = case rabbit_mgmt_util:vhost(ReqData) of
none ->
all;
VHost ->
VHost
end,
VHost =
case rabbit_mgmt_util:vhost(ReqData) of
none ->
all;
V ->
V
end,
Queue = rabbit_mgmt_util:id(queue, ReqData),
Publishers =
rabbit_mgmt_format:strip_pids(
rabbit_stream_mgmt_db:get_all_publishers(Arg)),
case {VHost, Queue} of
{VHost, none} ->
rabbit_mgmt_format:strip_pids(
rabbit_stream_mgmt_db:get_all_publishers(VHost));
{VHost, Q} ->
QueueResource =
#resource{virtual_host = VHost,
name = Q,
kind = queue},
rabbit_mgmt_format:strip_pids(
rabbit_stream_mgmt_db:get_stream_publishers(QueueResource))
end,
rabbit_mgmt_util:reply_list(filter_user(Publishers, User),
[],
ReqData,
Expand Down
16 changes: 9 additions & 7 deletions deps/rabbitmq_stream_management/test/http_SUITE.erl
Expand Up @@ -34,13 +34,15 @@ init_per_suite(Config) ->
[{rmq_nodename_suffix, ?MODULE}]),
Config2 =
rabbit_ct_helpers:set_config(Config1,
{rabbitmq_ct_tls_verify, verify_none}),
SetupStep = fun(StepConfig) ->
rabbit_ct_helpers:merge_app_env(StepConfig,
{rabbit,
[{collect_statistics_interval,
500}]})
end,
{rabbitmq_ct_tls_verify,
verify_none}),
SetupStep =
fun(StepConfig) ->
rabbit_ct_helpers:merge_app_env(StepConfig,
{rabbit,
[{collect_statistics_interval,
500}]})
end,
rabbit_ct_helpers:run_setup_steps(Config2,
[SetupStep]
++ rabbit_ct_broker_helpers:setup_steps()
Expand Down
Expand Up @@ -393,6 +393,8 @@ void publishers() throws Exception {

client.declarePublisher((byte) 0, null, stream);
waitUntil(() -> request.call().size() == initialCount + 1);
assertThat(toMaps(get("/stream/publishers/%2F"))).hasSize(1);
assertThat(toMaps(get("/stream/publishers/vh1"))).isEmpty();
waitUntil(() -> entities(request.call(), client).size() == 1);

Map<String, Object> publisher = entities(request.call(), client).get(0);
Expand Down Expand Up @@ -427,6 +429,51 @@ void publishers() throws Exception {
waitUntil(() -> entities(request.call(), client).isEmpty());
}

@Test
void publishersByStream() throws Exception {
Callable<List<Map<String, Object>>> request =
() -> toMaps(get("/stream/publishers/%2F/" + stream));
int initialCount = request.call().size();
String connectionProvidedName = UUID.randomUUID().toString();
AtomicBoolean closed = new AtomicBoolean(false);
Client client =
cf.get(
new ClientParameters()
.clientProperty("connection_name", connectionProvidedName)
.shutdownListener(shutdownContext -> closed.set(true)));

String otherStream = UUID.randomUUID().toString();
assertThat(client.create(otherStream).isOk()).isTrue();

client.declarePublisher((byte) 0, null, stream);
client.declarePublisher((byte) 1, null, otherStream);

waitUntil(() -> toMaps(get("/stream/publishers/%2F")).size() == initialCount + 2);
waitUntil(() -> request.call().size() == initialCount + 1);
waitUntil(() -> entities(request.call(), client).size() == 1);

Map<String, Object> publisher = entities(request.call(), client).get(0);
assertThat(connectionDetails(publisher))
.containsEntry("name", connectionName(client))
.containsEntry("user", "guest")
.containsKey("node");
assertThat(queue(publisher)).containsEntry("name", stream).containsEntry("vhost", "/");

Callable<List<Map<String, Object>>> requestOtherStream =
() -> toMaps(get("/stream/publishers/%2F/" + otherStream));
waitUntil(() -> entities(requestOtherStream.call(), client).size() == 1);

publisher = entities(requestOtherStream.call(), client).get(0);
assertThat(connectionDetails(publisher))
.containsEntry("name", connectionName(client))
.containsEntry("user", "guest")
.containsKey("node");
assertThat(queue(publisher)).containsEntry("name", otherStream).containsEntry("vhost", "/");

client.deletePublisher((byte) 0);
client.deletePublisher((byte) 1);
}

@ParameterizedTest
@ValueSource(strings = {"foo"})
@NullSource
Expand Down Expand Up @@ -740,7 +787,9 @@ void permissions() throws Exception {
"/stream/connections/%2F/foo-connection-name/consumers",
"/stream/connections/%2F/foo-connection-name/publishers",
"/stream/consumers/foo-virtual-host",
"/stream/publishers/foo-virtual-host"
"/stream/publishers/foo-virtual-host",
"/stream/publishers/foo-virtual-host",
"/stream/publishers/%2F/foo-stream"
})
void shouldReturnNotFound(String endpoint) {
assertThatThrownBy(() -> get(endpoint)).hasMessageContaining("404");
Expand Down

0 comments on commit b570075

Please sign in to comment.