Skip to content

Commit

Permalink
Merge pull request #1852
Browse files Browse the repository at this point in the history
Include in-process sources/sinks in status output
  • Loading branch information
dominiklohmann committed Aug 19, 2021
2 parents 59e48c3 + 0d21b45 commit b984a2e
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 13 deletions.
@@ -0,0 +1,3 @@
The output of VAST status now includes status information for sources and sinks
spawned in the VAST node, i.e., via `vast spawn source|sink <format>` rather
than `vast import|export <format>`.
19 changes: 15 additions & 4 deletions libvast/src/system/datagram_source.cpp
Expand Up @@ -175,10 +175,21 @@ caf::behavior datagram_source(
if (v >= status_verbosity::debug)
detail::fill_status_map(src, self);
const auto timeout = defaults::system::initial_request_timeout / 5 * 4;
collect_status(rs, timeout, v, self->state.transformer, src,
"transformer");
auto& xs = put_list(rs->content, "sources");
xs.emplace_back(std::move(src));
collect_status(
rs, timeout, v, self->state.transformer,
[rs, src](caf::settings& response) mutable {
put(src, "transformer", std::move(response));
auto& xs = put_list(rs->content, "sources");
xs.emplace_back(std::move(src));
},
[rs, src](const caf::error& err) mutable {
VAST_WARN("{} failed to retrieve status for the key transformer: "
"{}",
*rs->self, err);
put(src, "transformer", fmt::to_string(err));
auto& xs = put_list(rs->content, "sources");
xs.emplace_back(std::move(src));
});
}
return rs->promise;
},
Expand Down
6 changes: 3 additions & 3 deletions libvast/src/system/node.cpp
Expand Up @@ -178,9 +178,9 @@ void collect_component_status(node_actor::stateful_pointer<node_state> self,
const auto timeout = defaults::system::initial_request_timeout;
// Send out requests and collects answers.
for (const auto& [label, component] : self->state.registry.components()) {
// Requests to busy sources and sinks can easily delay the combined response
// because the status requests don't get scheduled soon enough.
if (component.type == "source" || component.type == "sink")
// Requests to busy remote sources and sinks can easily delay the combined
// response because the status requests don't get scheduled soon enough.
if (component.actor.home_system().node() != self->home_system().node())
continue;
collect_status(rs, timeout, v, component.actor, rs->content,
component.type);
Expand Down
19 changes: 15 additions & 4 deletions libvast/src/system/source.cpp
Expand Up @@ -328,10 +328,21 @@ source(caf::stateful_actor<source_state>* self, format::reader_ptr reader,
if (v >= status_verbosity::debug)
detail::fill_status_map(src, self);
const auto timeout = defaults::system::initial_request_timeout / 5 * 4;
collect_status(rs, timeout, v, self->state.transformer, src,
"transformer");
auto& xs = put_list(rs->content, "sources");
xs.emplace_back(std::move(src));
collect_status(
rs, timeout, v, self->state.transformer,
[rs, src](caf::settings& response) mutable {
put(src, "transformer", std::move(response));
auto& xs = put_list(rs->content, "sources");
xs.emplace_back(std::move(src));
},
[rs, src](const caf::error& err) mutable {
VAST_WARN("{} failed to retrieve status for the key transformer: "
"{}",
*rs->self, err);
put(src, "transformer", fmt::to_string(err));
auto& xs = put_list(rs->content, "sources");
xs.emplace_back(std::move(src));
});
}
return rs->promise;
},
Expand Down
4 changes: 2 additions & 2 deletions libvast/vast/system/status.hpp
Expand Up @@ -110,10 +110,10 @@ void collect_status(
->template request<caf::message_priority::high>(
responder, caf::duration{timeout}, atom::status_v, verbosity)
.then(
[rs, f = std::forward<F>(f)](caf::settings& response) {
[rs, f = std::forward<F>(f)](caf::settings& response) mutable {
f(response);
},
[rs, fe = std::forward<Fe>(fe)](caf::error& err) {
[rs, fe = std::forward<Fe>(fe)](caf::error& err) mutable {
fe(err);
});
}
Expand Down

0 comments on commit b984a2e

Please sign in to comment.