diff --git a/changelog/unreleased/bug-fixes/1852--in-process-sources-status.md b/changelog/unreleased/bug-fixes/1852--in-process-sources-status.md new file mode 100644 index 00000000000..e82041b8a20 --- /dev/null +++ b/changelog/unreleased/bug-fixes/1852--in-process-sources-status.md @@ -0,0 +1,3 @@ +The output of VAST status now includes status information for sources and sinks +spawned in the VAST node, i.e., via `vast spawn source|sink ` rather +than `vast import|export `. diff --git a/libvast/src/system/datagram_source.cpp b/libvast/src/system/datagram_source.cpp index 7da4dcbf970..336b32dc8f0 100644 --- a/libvast/src/system/datagram_source.cpp +++ b/libvast/src/system/datagram_source.cpp @@ -175,10 +175,21 @@ caf::behavior datagram_source( if (v >= status_verbosity::debug) detail::fill_status_map(src, self); const auto timeout = defaults::system::initial_request_timeout / 5 * 4; - collect_status(rs, timeout, v, self->state.transformer, src, - "transformer"); - auto& xs = put_list(rs->content, "sources"); - xs.emplace_back(std::move(src)); + collect_status( + rs, timeout, v, self->state.transformer, + [rs, src](caf::settings& response) mutable { + put(src, "transformer", std::move(response)); + auto& xs = put_list(rs->content, "sources"); + xs.emplace_back(std::move(src)); + }, + [rs, src](const caf::error& err) mutable { + VAST_WARN("{} failed to retrieve status for the key transformer: " + "{}", + *rs->self, err); + put(src, "transformer", fmt::to_string(err)); + auto& xs = put_list(rs->content, "sources"); + xs.emplace_back(std::move(src)); + }); } return rs->promise; }, diff --git a/libvast/src/system/node.cpp b/libvast/src/system/node.cpp index d36656219cb..83a8a593415 100644 --- a/libvast/src/system/node.cpp +++ b/libvast/src/system/node.cpp @@ -178,9 +178,9 @@ void collect_component_status(node_actor::stateful_pointer self, const auto timeout = defaults::system::initial_request_timeout; // Send out requests and collects answers. for (const auto& [label, component] : self->state.registry.components()) { - // Requests to busy sources and sinks can easily delay the combined response - // because the status requests don't get scheduled soon enough. - if (component.type == "source" || component.type == "sink") + // Requests to busy remote sources and sinks can easily delay the combined + // response because the status requests don't get scheduled soon enough. + if (component.actor.home_system().node() != self->home_system().node()) continue; collect_status(rs, timeout, v, component.actor, rs->content, component.type); diff --git a/libvast/src/system/source.cpp b/libvast/src/system/source.cpp index 2a184d7a3a9..82cd2315d12 100644 --- a/libvast/src/system/source.cpp +++ b/libvast/src/system/source.cpp @@ -328,10 +328,21 @@ source(caf::stateful_actor* self, format::reader_ptr reader, if (v >= status_verbosity::debug) detail::fill_status_map(src, self); const auto timeout = defaults::system::initial_request_timeout / 5 * 4; - collect_status(rs, timeout, v, self->state.transformer, src, - "transformer"); - auto& xs = put_list(rs->content, "sources"); - xs.emplace_back(std::move(src)); + collect_status( + rs, timeout, v, self->state.transformer, + [rs, src](caf::settings& response) mutable { + put(src, "transformer", std::move(response)); + auto& xs = put_list(rs->content, "sources"); + xs.emplace_back(std::move(src)); + }, + [rs, src](const caf::error& err) mutable { + VAST_WARN("{} failed to retrieve status for the key transformer: " + "{}", + *rs->self, err); + put(src, "transformer", fmt::to_string(err)); + auto& xs = put_list(rs->content, "sources"); + xs.emplace_back(std::move(src)); + }); } return rs->promise; }, diff --git a/libvast/vast/system/status.hpp b/libvast/vast/system/status.hpp index ed8d3378c35..3bb277a3e73 100644 --- a/libvast/vast/system/status.hpp +++ b/libvast/vast/system/status.hpp @@ -110,10 +110,10 @@ void collect_status( ->template request( responder, caf::duration{timeout}, atom::status_v, verbosity) .then( - [rs, f = std::forward(f)](caf::settings& response) { + [rs, f = std::forward(f)](caf::settings& response) mutable { f(response); }, - [rs, fe = std::forward(fe)](caf::error& err) { + [rs, fe = std::forward(fe)](caf::error& err) mutable { fe(err); }); }