Skip to content

Commit

Permalink
wip: Write homogenous partitions from the partition transformer
Browse files Browse the repository at this point in the history
  • Loading branch information
lava committed May 13, 2022
1 parent a0791a9 commit ac94034
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 109 deletions.
1 change: 1 addition & 0 deletions libvast/include/vast/system/actors.hpp
Expand Up @@ -374,6 +374,7 @@ using filesystem_actor = typed_actor_fwd<
/// The interface of an BULK PARTITION actor.
using partition_transformer_actor = typed_actor_fwd<
// Persist transformed partition to given path.
// FIXME: augmented_partition_synopsis -> vector<augmented_partition_synopsis>
caf::replies_to<atom::persist, std::filesystem::path,
std::filesystem::path>::with<augmented_partition_synopsis>,
// INTERNAL: Continuation handler for `atom::done`.
Expand Down
29 changes: 18 additions & 11 deletions libvast/include/vast/system/partition_transformer.hpp
Expand Up @@ -30,8 +30,8 @@ struct partition_transformer_state {
static constexpr const char* name = "partition-transformer";

struct stream_data {
caf::expected<chunk_ptr> partition_chunk = caf::no_error;
caf::expected<chunk_ptr> synopsis_chunk = caf::no_error;
caf::expected<std::vector<chunk_ptr>> partition_chunks = caf::no_error;
caf::expected<std::vector<chunk_ptr>> synopsis_chunks = caf::no_error;
};

struct path_data {
Expand Down Expand Up @@ -60,15 +60,16 @@ struct partition_transformer_state {
accountant_actor accountant = {};

/// Actor handle of the store builder for this partition.
store_builder_actor store_builder = {};
detail::flat_map<type, store_builder_actor> store_builders = {};

/// Actor handle of the filesystem actor.
filesystem_actor fs = {};

/// The transform to be applied to the data.
transform_ptr transform = {};

/// The stream stage to send table slices to the store.
/// The stream stage to send table slices to the store(s).
// FIXME: Don't do broadcast but add a filter per type.
caf::stream_stage_ptr<table_slice,
caf::broadcast_downstream_manager<table_slice>>
stage = {};
Expand All @@ -79,22 +80,28 @@ struct partition_transformer_state {
/// Cached table slices in this partition.
std::vector<table_slice> slices = {};

/// The maximum number of events in this partition. (not really necessary, but
/// The maximum number of events per partition. (not really necessary, but
/// required by the partition synopsis)
size_t partition_capacity = 0ull;

/// Total number of rows in `slices`.
/// Total number of rows in all transformed `slices`.
size_t events = 0ull;

/// Number of rows per event type.
index_statistics stats;
/// Number of rows per event type in the input and output.
index_statistics stats_in;
index_statistics stats_out;

/// The data of the newly created partition.
active_partition_state::serialization_data data = {};
/// The data of the newly created partition(s).
detail::flat_map<type, active_partition_state::serialization_data> data = {};

/// Stores the value index for each field.
// Fields with a `#skip` attribute are stored as `nullptr`.
detail::stable_map<qualified_record_field, value_index_ptr> indexers = {};
using value_index_map
= detail::stable_map<qualified_record_field, value_index_ptr>;
detail::flat_map<qualified_record_field, value_index_map> indexers = {};

/// Store id for partitions.
std::string store_id;

/// Options for creating new synopses.
index_config synopsis_opts = {};
Expand Down

0 comments on commit ac94034

Please sign in to comment.