Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the transformer actor #2896

Merged
merged 16 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Pipelines that reduce the number of events do not prevent `vast export`
processes that have a `max-events` limit from terminating any more.
61 changes: 0 additions & 61 deletions libvast/include/vast/detail/framed.hpp

This file was deleted.

6 changes: 0 additions & 6 deletions libvast/include/vast/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -446,12 +446,6 @@ CAF_BEGIN_TYPE_ID_BLOCK(vast_types, first_vast_type_id)
std::unordered_map<vast::uuid, vast::partition_synopsis_ptr>>))
VAST_ADD_TYPE_ID((std::vector<vast::partition_synopsis_pair>))

VAST_ADD_TYPE_ID((vast::detail::framed<vast::table_slice>))
VAST_ADD_TYPE_ID((std::vector<vast::detail::framed<vast::table_slice>>))
VAST_ADD_TYPE_ID((caf::stream<vast::detail::framed<vast::table_slice>>))
VAST_ADD_TYPE_ID(
(caf::inbound_stream_slot<vast::detail::framed<vast::table_slice>>))

VAST_ADD_TYPE_ID((caf::stream<vast::table_slice>))
VAST_ADD_TYPE_ID((caf::stream<vast::table_slice_column>))
VAST_ADD_TYPE_ID((caf::inbound_stream_slot<vast::table_slice>))
Expand Down
14 changes: 0 additions & 14 deletions libvast/include/vast/system/actors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -399,20 +399,6 @@ using datagram_source_actor = typed_actor_fwd<
// Conform to the protocol of the SOURCE actor.
::extend_with<source_actor>::unwrap_as_broker;

/// The interface of an TRANSFORMER actor.
using transformer_actor = typed_actor_fwd<
// Send transformed slices to this sink.
auto(stream_sink_actor<table_slice>)
->caf::result<caf::outbound_stream_slot<table_slice>>,
// Send transformed slices to this sink; pass the string through along with
// the stream handshake.
auto(stream_sink_actor<table_slice, std::string>, std::string)
->caf::result<void>>
// Conform to the protocol of the STREAM SINK actor for framed table slices
::extend_with<stream_sink_actor<detail::framed<table_slice>>>
// Conform to the protocol of the STATUS CLIENT actor.
::extend_with<status_client_actor>::unwrap;

/// The interface of the NODE actor.
using node_actor = typed_actor_fwd<
// Run an invocation in the node.
Expand Down
2 changes: 1 addition & 1 deletion libvast/include/vast/system/exporter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@

#include "vast/aliases.hpp"
#include "vast/expression.hpp"
#include "vast/pipeline.hpp"
#include "vast/query_context.hpp"
#include "vast/query_options.hpp"
#include "vast/system/actors.hpp"
#include "vast/system/query_status.hpp"
#include "vast/system/transformer.hpp"
#include "vast/table_slice.hpp"
#include "vast/uuid.hpp"

Expand Down
14 changes: 8 additions & 6 deletions libvast/include/vast/system/importer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@

#include "vast/aliases.hpp"
#include "vast/data.hpp"
#include "vast/detail/framed.hpp"
#include "vast/detail/heterogeneous_string_hash.hpp"
#include "vast/pipeline.hpp"
#include "vast/system/actors.hpp"
#include "vast/system/instrumentation.hpp"
#include "vast/system/transformer.hpp"
#include "vast/table_slice.hpp"

#include <caf/broadcast_downstream_manager.hpp>
#include <caf/typed_event_based_actor.hpp>
#include <caf/typed_response_promise.hpp>

Expand Down Expand Up @@ -65,12 +66,10 @@ struct importer_state {
id_block current;

/// The continous stage that moves data from all sources to all subscribers.
caf::stream_stage_ptr<
table_slice, caf::broadcast_downstream_manager<detail::framed<table_slice>>>
caf::stream_stage_ptr<table_slice,
caf::broadcast_downstream_manager<table_slice>>
stage;

transformer_actor transformer;

/// Pointer to the owning actor.
importer_actor::pointer self;

Expand All @@ -87,6 +86,9 @@ struct importer_state {

accountant_actor accountant;

/// The executor for transforming table slices in a pipeline.
pipeline_executor executor;

/// Name of this actor in log events.
static inline const char* name = "importer";
};
Expand Down
3 changes: 1 addition & 2 deletions libvast/include/vast/system/make_pipelines.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@

#include "vast/fwd.hpp"

#include "vast/system/actors.hpp"
#include "vast/system/transformer.hpp"
#include "vast/pipeline.hpp"

#include <caf/expected.hpp>
#include <caf/typed_actor.hpp>
Expand Down
2 changes: 1 addition & 1 deletion libvast/include/vast/system/make_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

#include "vast/fwd.hpp"

#include "vast/pipeline.hpp"
#include "vast/system/actors.hpp"
#include "vast/system/transformer.hpp"

#include <caf/expected.hpp>
#include <caf/fwd.hpp>
Expand Down
8 changes: 3 additions & 5 deletions libvast/include/vast/system/source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

#include "vast/fwd.hpp"

#include "vast/detail/framed.hpp"
#include "vast/expression.hpp"
#include "vast/format/reader.hpp"
#include "vast/module.hpp"
Expand Down Expand Up @@ -38,8 +37,7 @@ struct source_state {

// -- member types -----------------------------------------------------------

using downstream_manager
= caf::broadcast_downstream_manager<detail::framed<table_slice>>;
using downstream_manager = caf::broadcast_downstream_manager<table_slice>;

// -- member variables -------------------------------------------------------

Expand All @@ -55,8 +53,8 @@ struct source_state {
/// Actor for collecting statistics.
accountant_actor accountant = {};

/// Actor that receives events.
transformer_actor transformer = {};
/// The executor for transforming table slices in a pipeline.
pipeline_executor executor;

/// The `source` only supports a single sink, so we track here if we
/// already got it.
Expand Down
81 changes: 0 additions & 81 deletions libvast/include/vast/system/transformer.hpp

This file was deleted.

1 change: 0 additions & 1 deletion libvast/src/detail/add_message_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "vast/chunk.hpp"
#include "vast/command.hpp"
#include "vast/config.hpp"
#include "vast/detail/framed.hpp"
#include "vast/detail/stable_map.hpp"
#include "vast/die.hpp"
#include "vast/expression.hpp"
Expand Down
41 changes: 10 additions & 31 deletions libvast/src/system/datagram_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
#include "vast/expression_visitors.hpp"
#include "vast/logger.hpp"
#include "vast/module.hpp"
#include "vast/pipeline.hpp"
#include "vast/system/status.hpp"
#include "vast/system/transformer.hpp"
#include "vast/table_slice.hpp"

#include <caf/attach_continuous_stream_source.hpp>
Expand All @@ -45,11 +45,12 @@ caf::behavior datagram_source(
const catalog_actor& catalog, vast::module local_module,
std::string type_filter, accountant_actor accountant,
std::vector<pipeline>&& pipelines) {
self->state.transformer
= self->spawn(transformer, "source-transformer", std::move(pipelines));
if (!self->state.transformer) {
VAST_ERROR("{} failed to spawn transformer", *self);
self->quit();
self->state.executor = pipeline_executor{std::move(pipelines)};
if (auto err = self->state.executor.validate(
pipeline_executor::allow_aggregate_pipelines::yes)) {
self->quit(caf::make_error(
ec::invalid_argument,
fmt::format("{} received an invalid pipeline: {}", *self, err)));
return {};
}
// Try to open requested UDP port.
Expand All @@ -75,8 +76,6 @@ caf::behavior datagram_source(
self->set_exit_handler([=](const caf::exit_msg& msg) {
VAST_VERBOSE("{} received EXIT from {}", *self, msg.source);
self->state.done = true;
if (self->state.mgr)
self->state.mgr->out().push(detail::framed<table_slice>::make_eof());
self->quit(msg.reason);
});
// Spin up the stream manager for the source.
Expand All @@ -87,7 +86,7 @@ caf::behavior datagram_source(
self->state.start_time = std::chrono::system_clock::now();
},
// get next element
[](caf::unit_t&, caf::downstream<detail::framed<table_slice>>&, size_t) {
[](caf::unit_t&, caf::downstream<table_slice>&, size_t) {
// nop, new slices are generated in the new_datagram_msg handler
},
// done?
Expand Down Expand Up @@ -155,10 +154,8 @@ caf::behavior datagram_source(
}
});
}
// Start streaming.
self->state.mgr->add_outbound_path(self->state.transformer);
auto name = std::string{self->state.reader->name()};
self->delegate(self->state.transformer, sink, name);
self->state.mgr->add_outbound_path(
sink, std::make_tuple(self->state.reader->name()));
},
[self](atom::get, atom::module) -> caf::result<module> {
return self->state.reader->module();
Expand All @@ -181,24 +178,6 @@ caf::behavior datagram_source(
// General state such as open streams.
if (v >= status_verbosity::debug)
detail::fill_status_map(src, self);
const auto timeout = defaults::system::status_request_timeout / 5 * 4;
collect_status(
rs, timeout, v, self->state.transformer,
[rs, src](record& response) mutable {
src["transformer"] = std::move(response);
auto xs = list{};
xs.emplace_back(std::move(src));
rs->content["sources"] = std::move(xs);
},
[rs, src](const caf::error& err) mutable {
VAST_WARN("{} failed to retrieve status for the key transformer: "
"{}",
*rs->self, err);
src["transformer"] = fmt::to_string(err);
auto xs = list{};
xs.emplace_back(std::move(src));
rs->content["sources"] = std::move(xs);
});
}
return rs->promise;
},
Expand Down